You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/02/02 05:53:26 UTC

[spark] branch master updated: [SPARK-38829][SQL] Introduce conf spark.sql.parquet.inferTimestampNTZ.enabled for TimestampNTZ inference on Parquet

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae24327e898 [SPARK-38829][SQL] Introduce conf spark.sql.parquet.inferTimestampNTZ.enabled for TimestampNTZ inference on Parquet
ae24327e898 is described below

commit ae24327e8984e7ee8ba948e3be0b6b2f28df68d0
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Wed Feb 1 21:53:12 2023 -0800

    [SPARK-38829][SQL] Introduce conf spark.sql.parquet.inferTimestampNTZ.enabled for TimestampNTZ inference on Parquet
    
    ### What changes were proposed in this pull request?
    
    Introduces conf `spark.sql.parquet.inferTimestampNTZ.enabled` for TimestampNTZ inference on Parquet. When enabled, Parquet timestamp columns with annotation isAdjustedToUTC = false are inferred as TIMESTAMP_NTZ type during schema inference. Otherwise, all the Parquet timestamp columns are inferred as TIMESTAMP_LTZ types. Note that Spark writes the output schema into Parquet's footer metadata on file writing and leverages it on file reading. Thus this configuration only affects the sch [...]
    
    This PR also removes the configuration `spark.sql.parquet.timestampNTZ.enabled`. The configuration is "heavy". When false, Spark can't write TimestampNTZ columns to Parquet files.
    ### Why are the changes needed?
    
    The only compatibility issue for supporting TimestampNTZ on the Parquet data source is about schema inference on the files which are not written by Spark. Thus we can simply have a flag about it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, TimestampNTZ is not released yet.
    ### How was this patch tested?
    
    UT
    
    Closes #39856 from gengliangwang/inferParuqetTimestampNTZ.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 17 +++---
 .../parquet/SpecificParquetRecordReaderBase.java   |  4 +-
 .../datasources/parquet/ParquetFileFormat.scala    | 10 ++--
 .../datasources/parquet/ParquetReadSupport.scala   | 62 ++++++++--------------
 .../parquet/ParquetSchemaConverter.scala           | 24 ++++-----
 .../datasources/parquet/ParquetUtils.scala         |  4 --
 .../datasources/v2/parquet/ParquetScan.scala       |  4 +-
 .../parquet/ParquetFieldIdSchemaSuite.scala        |  6 +--
 .../datasources/parquet/ParquetIOSuite.scala       | 40 ++++++--------
 .../datasources/parquet/ParquetSchemaSuite.scala   | 27 +++++-----
 10 files changed, 81 insertions(+), 117 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f407dda009c..1cc3b61b834 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1129,13 +1129,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
-  val PARQUET_TIMESTAMP_NTZ_ENABLED =
-    buildConf("spark.sql.parquet.timestampNTZ.enabled")
-      .doc(s"Enables ${TimestampTypes.TIMESTAMP_NTZ} support for Parquet reads and writes. " +
-        s"When enabled, ${TimestampTypes.TIMESTAMP_NTZ} values are written as Parquet timestamp " +
-        "columns with annotation isAdjustedToUTC = false and are inferred in a similar way. " +
-        s"When disabled, such values are read as ${TimestampTypes.TIMESTAMP_LTZ} and have to be " +
-        s"converted to ${TimestampTypes.TIMESTAMP_LTZ} for writes.")
+  val PARQUET_INFER_TIMESTAMP_NTZ_ENABLED =
+    buildConf("spark.sql.parquet.inferTimestampNTZ.enabled")
+      .doc("When enabled, Parquet timestamp columns with annotation isAdjustedToUTC = false " +
+        "are inferred as TIMESTAMP_NTZ type during schema inference. Otherwise, all the Parquet " +
+        "timestamp columns are inferred as TIMESTAMP_LTZ types. Note that Spark writes the " +
+        "output schema into Parquet's footer metadata on file writing and leverages it on file " +
+        "reading. Thus this configuration only affects the schema inference on Parquet files " +
+        "which are not written by Spark.")
       .version("3.4.0")
       .booleanConf
       .createWithDefault(true)
@@ -4943,7 +4944,7 @@ class SQLConf extends Serializable with Logging {
 
   def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)
 
-  def parquetTimestampNTZEnabled: Boolean = getConf(PARQUET_TIMESTAMP_NTZ_ENABLED)
+  def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED)
 
   def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 6ea1a0c37b1..b14f329b413 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -149,7 +149,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
-    config.setBoolean(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED().key(), false);
+    config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
 
     this.file = new Path(path);
     long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -200,7 +200,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
-    config.setBoolean(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED().key(), false);
+    config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
     this.parquetColumn = new ParquetToSparkSchemaConverter(config)
       .convertParquetColumn(requestedSchema, Option.empty());
     this.sparkSchema = (StructType) parquetColumn.sparkType();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b4651e3260..afa00aa6f37 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -151,8 +151,8 @@ class ParquetFileFormat
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      sparkSession.sessionState.conf.parquetTimestampNTZEnabled)
+      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -357,7 +357,7 @@ object ParquetFileFormat extends Logging {
     val converter = new ParquetToSparkSchemaConverter(
       sparkSession.sessionState.conf.isParquetBinaryAsString,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
-      timestampNTZEnabled = sparkSession.sessionState.conf.parquetTimestampNTZEnabled)
+      inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
 
     val seen = mutable.HashSet[String]()
     val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -453,14 +453,14 @@ object ParquetFileFormat extends Logging {
       sparkSession: SparkSession): Option[StructType] = {
     val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
-    val timestampNTZEnabled = sparkSession.sessionState.conf.parquetTimestampNTZEnabled
+    val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled
 
     val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
       // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
       val converter = new ParquetToSparkSchemaConverter(
         assumeBinaryIsString = assumeBinaryIsString,
         assumeInt96IsTimestamp = assumeInt96IsTimestamp,
-        timestampNTZEnabled = timestampNTZEnabled)
+        inferTimestampNTZ = inferTimestampNTZ)
 
       readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
         .map(ParquetFileFormat.readSchemaFromFooter(_, converter))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 1d35e9ea049..6e29afce491 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -130,8 +130,8 @@ object ParquetReadSupport extends Logging {
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
     val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key,
       SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get)
-    val timestampNTZEnabled = conf.getBoolean(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.defaultValue.get)
+    val inferTimestampNTZ = conf.getBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get)
     val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
       SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
 
@@ -152,7 +152,7 @@ object ParquetReadSupport extends Logging {
            |""".stripMargin)
     }
     val parquetClippedSchema = ParquetReadSupport.clipParquetSchema(parquetFileSchema,
-      catalystRequestedSchema, caseSensitive, useFieldId, timestampNTZEnabled)
+      catalystRequestedSchema, caseSensitive, useFieldId)
 
     // We pass two schema to ParquetRecordMaterializer:
     // - parquetRequestedSchema: the schema of the file data we want to read
@@ -194,10 +194,9 @@ object ParquetReadSupport extends Logging {
       parquetSchema: MessageType,
       catalystSchema: StructType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): MessageType = {
+      useFieldId: Boolean): MessageType = {
     val clippedParquetFields = clipParquetGroupFields(
-      parquetSchema.asGroupType(), catalystSchema, caseSensitive, useFieldId, timestampNTZEnabled)
+      parquetSchema.asGroupType(), catalystSchema, caseSensitive, useFieldId)
     if (clippedParquetFields.isEmpty) {
       ParquetSchemaConverter.EMPTY_MESSAGE
     } else {
@@ -212,25 +211,21 @@ object ParquetReadSupport extends Logging {
       parquetType: Type,
       catalystType: DataType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): Type = {
+      useFieldId: Boolean): Type = {
     val newParquetType = catalystType match {
       case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
         // Only clips array types with nested type as element type.
-        clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive, useFieldId,
-          timestampNTZEnabled)
+        clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive, useFieldId)
 
       case t: MapType
         if !isPrimitiveCatalystType(t.keyType) ||
            !isPrimitiveCatalystType(t.valueType) =>
         // Only clips map types with nested key type or value type
         clipParquetMapType(
-          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, useFieldId,
-          timestampNTZEnabled)
+          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, useFieldId)
 
       case t: StructType =>
-        clipParquetGroup(
-          parquetType.asGroupType(), t, caseSensitive, useFieldId, timestampNTZEnabled)
+        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, useFieldId)
 
       case _ =>
         // UDTs and primitive types are not clipped.  For UDTs, a clipped version might not be able
@@ -266,8 +261,7 @@ object ParquetReadSupport extends Logging {
       parquetList: GroupType,
       elementType: DataType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): Type = {
+      useFieldId: Boolean): Type = {
     // Precondition of this method, should only be called for lists with nested element types.
     assert(!isPrimitiveCatalystType(elementType))
 
@@ -275,7 +269,7 @@ object ParquetReadSupport extends Logging {
     // list element type is just the group itself.  Clip it.
     if (parquetList.getLogicalTypeAnnotation == null &&
       parquetList.isRepetition(Repetition.REPEATED)) {
-      clipParquetType(parquetList, elementType, caseSensitive, useFieldId, timestampNTZEnabled)
+      clipParquetType(parquetList, elementType, caseSensitive, useFieldId)
     } else {
       assert(
         parquetList.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation],
@@ -309,15 +303,14 @@ object ParquetReadSupport extends Logging {
           .as(LogicalTypeAnnotation.listType())
           .addField(
             clipParquetType(
-              repeatedGroup, elementType, caseSensitive, useFieldId, timestampNTZEnabled))
+              repeatedGroup, elementType, caseSensitive, useFieldId))
           .named(parquetList.getName)
       } else {
         val newRepeatedGroup = Types
           .repeatedGroup()
           .addField(
             clipParquetType(
-              repeatedGroup.getType(0), elementType, caseSensitive, useFieldId,
-              timestampNTZEnabled))
+              repeatedGroup.getType(0), elementType, caseSensitive, useFieldId))
           .named(repeatedGroup.getName)
 
         val newElementType = if (useFieldId && repeatedGroup.getId != null) {
@@ -347,8 +340,7 @@ object ParquetReadSupport extends Logging {
       keyType: DataType,
       valueType: DataType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): GroupType = {
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or value types.
     assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
 
@@ -361,11 +353,9 @@ object ParquetReadSupport extends Logging {
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
         .addField(
-          clipParquetType(
-            parquetKeyType, keyType, caseSensitive, useFieldId, timestampNTZEnabled))
+          clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
         .addField(
-          clipParquetType(
-            parquetValueType, valueType, caseSensitive, useFieldId, timestampNTZEnabled))
+          clipParquetType(parquetValueType, valueType, caseSensitive, useFieldId))
         .named(repeatedGroup.getName)
       if (useFieldId && repeatedGroup.getId != null) {
         newRepeatedGroup.withId(repeatedGroup.getId.intValue())
@@ -393,11 +383,9 @@ object ParquetReadSupport extends Logging {
       parquetRecord: GroupType,
       structType: StructType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): GroupType = {
+      useFieldId: Boolean): GroupType = {
     val clippedParquetFields =
-      clipParquetGroupFields(parquetRecord, structType, caseSensitive, useFieldId,
-        timestampNTZEnabled)
+      clipParquetGroupFields(parquetRecord, structType, caseSensitive, useFieldId)
     Types
       .buildGroup(parquetRecord.getRepetition)
       .as(parquetRecord.getLogicalTypeAnnotation)
@@ -414,12 +402,10 @@ object ParquetReadSupport extends Logging {
       parquetRecord: GroupType,
       structType: StructType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): Seq[Type] = {
+      useFieldId: Boolean): Seq[Type] = {
     val toParquet = new SparkToParquetSchemaConverter(
       writeLegacyParquetFormat = false,
-      useFieldId = useFieldId,
-      timestampNTZEnabled = timestampNTZEnabled)
+      useFieldId = useFieldId)
     lazy val caseSensitiveParquetFieldMap =
         parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
     lazy val caseInsensitiveParquetFieldMap =
@@ -430,7 +416,7 @@ object ParquetReadSupport extends Logging {
     def matchCaseSensitiveField(f: StructField): Type = {
       caseSensitiveParquetFieldMap
           .get(f.name)
-          .map(clipParquetType(_, f.dataType, caseSensitive, useFieldId, timestampNTZEnabled))
+          .map(clipParquetType(_, f.dataType, caseSensitive, useFieldId))
           .getOrElse(toParquet.convertField(f))
     }
 
@@ -445,8 +431,7 @@ object ParquetReadSupport extends Logging {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(
-                parquetTypes.head, f.dataType, caseSensitive, useFieldId, timestampNTZEnabled)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
     }
@@ -462,8 +447,7 @@ object ParquetReadSupport extends Logging {
             throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
               fieldId, parquetTypesString)
           } else {
-            clipParquetType(
-              parquetTypes.head, f.dataType, caseSensitive, useFieldId, timestampNTZEnabled)
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
           }
         }.getOrElse {
           // When there is no ID match, we use a fake name to avoid a name match by accident
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index bb0df3639dc..f6b02579d31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -48,31 +48,31 @@ import org.apache.spark.sql.types._
  *        [[TimestampType]] fields.
  * @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
  *                      schema with Parquet schema.
- * @param timestampNTZEnabled Whether TimestampNTZType type is enabled.
+ * @param inferTimestampNTZ Whether TimestampNTZType type is enabled.
  */
 class ParquetToSparkSchemaConverter(
     assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
     assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
     caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
-    timestampNTZEnabled: Boolean = SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
+    inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,
     assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
     caseSensitive = conf.caseSensitiveAnalysis,
-    timestampNTZEnabled = conf.parquetTimestampNTZEnabled)
+    inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled)
 
   def this(conf: Configuration) = this(
     assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
     assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
     caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
-    timestampNTZEnabled = conf.get(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
+    inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
 
   /**
    * Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter.
    */
   def isTimestampNTZEnabled(): Boolean = {
-    timestampNTZEnabled
+    inferTimestampNTZ
   }
 
   /**
@@ -266,7 +266,7 @@ class ParquetToSparkSchemaConverter(
             }
           case timestamp: TimestampLogicalTypeAnnotation
             if timestamp.getUnit == TimeUnit.MICROS || timestamp.getUnit == TimeUnit.MILLIS =>
-            if (timestamp.isAdjustedToUTC || !timestampNTZEnabled) {
+            if (timestamp.isAdjustedToUTC || !inferTimestampNTZ) {
               TimestampType
             } else {
               TimestampNTZType
@@ -460,27 +460,23 @@ class ParquetToSparkSchemaConverter(
  * @param outputTimestampType which parquet timestamp type to use when writing.
  * @param useFieldId whether we should include write field id to Parquet schema. Set this to false
  *        via `spark.sql.parquet.fieldId.write.enabled = false` to disable writing field ids.
- * @param timestampNTZEnabled whether TIMESTAMP_NTZ type support is enabled.
  */
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
       SQLConf.ParquetOutputTimestampType.INT96,
-    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get,
-    timestampNTZEnabled: Boolean = SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
     outputTimestampType = conf.parquetOutputTimestampType,
-    useFieldId = conf.parquetFieldIdWriteEnabled,
-    timestampNTZEnabled = conf.parquetTimestampNTZEnabled)
+    useFieldId = conf.parquetFieldIdWriteEnabled)
 
   def this(conf: Configuration) = this(
     writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
     outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
       conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)),
-    useFieldId = conf.get(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key).toBoolean,
-    timestampNTZEnabled = conf.get(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
+    useFieldId = conf.get(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key).toBoolean)
 
   /**
    * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
@@ -568,7 +564,7 @@ class SparkToParquetSchemaConverter(
               .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS)).named(field.name)
         }
 
-      case TimestampNTZType if timestampNTZEnabled =>
+      case TimestampNTZType =>
         Types.primitive(INT64, repetition)
           .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MICROS)).named(field.name)
       case BinaryType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 33438560e95..de4eda1acfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -461,10 +461,6 @@ object ParquetUtils extends Logging {
       SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
       sqlConf.parquetFieldIdWriteEnabled.toString)
 
-    conf.set(
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      sqlConf.parquetTimestampNTZEnabled.toString)
-
     // Sets compression scheme
     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 619a8fe66e3..7495893a911 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -89,8 +89,8 @@ case class ParquetScan(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      sparkSession.sessionState.conf.parquetTimestampNTZEnabled)
+      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
 
     val broadcastedConf = sparkSession.sparkContext.broadcast(
       new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
index 400e0ce28ea..b3babdd3a0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
@@ -39,16 +39,14 @@ class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
       catalystSchema: StructType,
       expectedSchema: String,
       caseSensitive: Boolean = true,
-      useFieldId: Boolean = true,
-      timestampNTZEnabled: Boolean = true): Unit = {
+      useFieldId: Boolean = true): Unit = {
     test(s"Clipping with field id - $testName") {
       val fileSchema = MessageTypeParser.parseMessageType(parquetSchema)
       val actual = ParquetReadSupport.clipParquetSchema(
         fileSchema,
         catalystSchema,
         caseSensitive = caseSensitive,
-        useFieldId = useFieldId,
-        timestampNTZEnabled = timestampNTZEnabled)
+        useFieldId = useFieldId)
 
       // each fake name should be uniquely generated
       val fakeColumnNames = actual.getPaths.asScala.flatten.filter(_.startsWith(FAKE_COLUMN_NAME))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index df96403ac50..8670d95c65e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -156,7 +156,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("SPARK-36182: TimestampNTZ") {
-    withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> "true") {
       val data = Seq("2021-01-01T00:00:00", "1970-07-15T01:02:03.456789")
         .map(ts => Tuple1(LocalDateTime.parse(ts)))
       withAllParquetReaders {
@@ -193,9 +193,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
         }
         writer.close
 
-        for (timestampNTZEnabled <- Seq(true, false)) {
-          withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> s"$timestampNTZEnabled") {
-            val timestampNTZType = if (timestampNTZEnabled) TimestampNTZType else TimestampType
+        for (inferTimestampNTZ <- Seq(true, false)) {
+          withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> s"$inferTimestampNTZ") {
+            val timestampNTZType = if (inferTimestampNTZ) TimestampNTZType else TimestampType
 
             withAllParquetReaders {
               val df = spark.read.parquet(tablePath.toString)
@@ -214,7 +214,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
               val ltz_value = new java.sql.Timestamp(1000L)
               val ntz_value = LocalDateTime.of(1970, 1, 1, 0, 0, 1)
 
-              val exp = if (timestampNTZEnabled) {
+              val exp = if (inferTimestampNTZ) {
                 (0 until numRecords).map { _ =>
                   (ltz_value, ltz_value, ltz_value, ltz_value, ntz_value, ntz_value)
                 }.toDF()
@@ -233,26 +233,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("Write TimestampNTZ type") {
-    // Writes should fail if timestamp_ntz support is disabled.
-    withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> "false") {
-      withTempPath { dir =>
-        val data = Seq(LocalDateTime.parse("2021-01-01T00:00:00")).toDF("col")
-        val err = intercept[Exception] {
+    // The configuration PARQUET_INFER_TIMESTAMP_NTZ_ENABLED doesn't affect the behavior of writes.
+    Seq(true, false).foreach { inferTimestampNTZ =>
+      withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> inferTimestampNTZ.toString) {
+        withTempPath { dir =>
+          val data = Seq(LocalDateTime.parse("2021-01-01T00:00:00")).toDF("col")
           data.write.parquet(dir.getCanonicalPath)
-        }.getCause
-        assert(err.getMessage.contains("Unsupported data type timestamp_ntz"))
-      }
-    }
-
-    withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> "true") {
-      withTempPath { dir =>
-        val data = Seq(LocalDateTime.parse("2021-01-01T00:00:00")).toDF("col")
-        data.write.parquet(dir.getCanonicalPath)
-        assertResult(spark.read.parquet(dir.getCanonicalPath).schema) {
-          StructType(
-            StructField("col", TimestampNTZType, nullable = true) ::
-            Nil
-          )
+          assertResult(spark.read.parquet(dir.getCanonicalPath).schema) {
+            StructType(
+              StructField("col", TimestampNTZType, nullable = true) ::
+                Nil
+            )
+          }
         }
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 4c0fbfda681..468e31d1879 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -63,14 +63,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       caseSensitive: Boolean = false,
-      timestampNTZEnabled: Boolean = true,
+      inferTimestampNTZ: Boolean = true,
       sparkReadSchema: Option[StructType] = None,
       expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
     val converter = new ParquetToSparkSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       caseSensitive = caseSensitive,
-      timestampNTZEnabled = timestampNTZEnabled)
+      inferTimestampNTZ = inferTimestampNTZ)
 
     test(s"sql <= parquet: $testName") {
       val actualParquetColumn = converter.convertParquetColumn(
@@ -97,11 +97,10 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
       writeLegacyParquetFormat: Boolean,
       outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
         SQLConf.ParquetOutputTimestampType.INT96,
-      timestampNTZEnabled: Boolean = true): Unit = {
+      inferTimestampNTZ: Boolean = true): Unit = {
     val converter = new SparkToParquetSchemaConverter(
       writeLegacyParquetFormat = writeLegacyParquetFormat,
-      outputTimestampType = outputTimestampType,
-      timestampNTZEnabled = timestampNTZEnabled)
+      outputTimestampType = outputTimestampType)
 
     test(s"sql => parquet: $testName") {
       val actual = converter.convert(sqlSchema)
@@ -2261,7 +2260,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         """.stripMargin,
       binaryAsString = true,
       int96AsTimestamp = int96AsTimestamp,
-      timestampNTZEnabled = true)
+      inferTimestampNTZ = true)
   }
 
   testCatalystToParquet(
@@ -2286,14 +2285,14 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
       """.stripMargin,
     writeLegacyParquetFormat = false,
-    timestampNTZEnabled = true)
+    inferTimestampNTZ = true)
 
-  for (timestampNTZEnabled <- Seq(true, false)) {
-    val dataType = if (timestampNTZEnabled) TimestampNTZType else TimestampType
+  for (inferTimestampNTZ <- Seq(true, false)) {
+    val dataType = if (inferTimestampNTZ) TimestampNTZType else TimestampType
 
     testParquetToCatalyst(
       "TimestampNTZ Parquet to Spark conversion for complex types, " +
-        s"timestampNTZEnabled: $timestampNTZEnabled",
+        s"inferTimestampNTZ: $inferTimestampNTZ",
       StructType(
         Seq(
           StructField("f1", dataType),
@@ -2315,7 +2314,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         """.stripMargin,
       binaryAsString = true,
       int96AsTimestamp = false,
-      timestampNTZEnabled = timestampNTZEnabled)
+      inferTimestampNTZ = inferTimestampNTZ)
   }
 
   private def testSchemaClipping(
@@ -2339,8 +2338,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         MessageTypeParser.parseMessageType(parquetSchema),
         catalystSchema,
         caseSensitive,
-        useFieldId = false,
-        timestampNTZEnabled = true)
+        useFieldId = false)
 
       try {
         expectedSchema.checkContains(actual)
@@ -2907,8 +2905,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
          MessageTypeParser.parseMessageType(parquetSchema),
           catalystSchema,
           caseSensitive = false,
-          useFieldId = false,
-          timestampNTZEnabled = false)
+          useFieldId = false)
       }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org