You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/15 11:39:54 UTC

[spark] branch master updated: [SPARK-42442][SQL] Use spark.sql.timestampType for data source inference

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 46226c2f14d [SPARK-42442][SQL] Use spark.sql.timestampType for data source inference
46226c2f14d is described below

commit 46226c2f14db185e95e8f83783a70fa86741b2eb
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Wed Feb 15 20:39:40 2023 +0900

    [SPARK-42442][SQL] Use spark.sql.timestampType for data source inference
    
    ### What changes were proposed in this pull request?
    
    With the configuration `spark.sql.timestampType`,  TIMESTAMP in Spark is a user-specified alias associated with one of the TIMESTAMP_LTZ and TIMESTAMP_NTZ variations. This is quite complicated to Spark users.
    
    There is another option `spark.sql.sources.timestampNTZTypeInference.enabled` for schema inference. I would like to introduce it in https://github.com/apache/spark/pull/40005 but having two flags seems too much. After thoughts, I decide to merge `spark.sql.sources.timestampNTZTypeInference.enabled` into `spark.sql.timestampType` and let  `spark.sql.timestampType` control the schema inference behavior.
    
    We can have followups to add data source options "inferTimestampNTZType" for CSV/JSON/partiton column like JDBC data source did.
    ### Why are the changes needed?
    
    Make the new feature simpler.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the feature is not released yet.
    
    ### How was this patch tested?
    
    Existing UT
    I also tried
    ```
    git grep spark.sql.sources.timestampNTZTypeInference.enabled
    git grep INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES
    ```
    to make sure the flag INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES is removed.
    
    Closes #40022 from gengliangwang/unifyInference.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala    |  2 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 27 ++--------
 .../sql/catalyst/csv/CSVInferSchemaSuite.scala     |  5 +-
 .../execution/datasources/PartitioningUtils.scala  |  4 +-
 .../execution/datasources/jdbc/JDBCOptions.scala   |  3 +-
 .../sql/execution/datasources/csv/CSVSuite.scala   | 10 ++--
 .../sql/execution/datasources/json/JsonSuite.scala | 10 ++--
 .../parquet/ParquetPartitionDiscoverySuite.scala   | 62 ++++++++++++++--------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 13 +++--
 10 files changed, 70 insertions(+), 68 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 826e8584db4..bdfa4ac3f0f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -203,7 +203,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
     // time-zone component and can be parsed with the timestamp formatter.
     // Otherwise, it is likely to be a timestamp with timezone.
     if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
-      SQLConf.get.timestampTypeInSchemaInference
+      SQLConf.get.timestampType
     } else {
       tryParseTimestamp(field)
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index b1429e6b215..5385afe8c93 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -152,7 +152,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
           decimalTry.get
         } else if (options.inferTimestamp &&
             timestampNTZFormatter.parseWithoutTimeZoneOptional(field, false).isDefined) {
-          SQLConf.get.timestampTypeInSchemaInference
+          SQLConf.get.timestampType
         } else if (options.inferTimestamp &&
             timestampFormatter.parseOptional(field).isDefined) {
           TimestampType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8d8aacbc9cb..e764e0510d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1425,16 +1425,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
-    buildConf("spark.sql.sources.timestampNTZTypeInference.enabled")
-      .doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " +
-        "this config determines whether to choose the TimestampNTZ type if a column can be " +
-        "either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " +
-        "the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.")
-      .version("3.4.0")
-      .booleanConf
-      .createWithDefault(false)
-
   val BUCKETING_ENABLED = buildConf("spark.sql.sources.bucketing.enabled")
     .doc("When false, we will treat bucketed table as normal table")
     .version("2.0.0")
@@ -3538,8 +3528,9 @@ object SQLConf {
 
   val TIMESTAMP_TYPE =
     buildConf("spark.sql.timestampType")
-      .doc("Configures the default timestamp type of Spark SQL, including SQL DDL, Cast clause " +
-        s"and type literal. Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " +
+      .doc("Configures the default timestamp type of Spark SQL, including SQL DDL, Cast clause, " +
+        "type literal and the schema inference of data sources. " +
+        s"Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " +
         "use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it as " +
         s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME ZONE. " +
         "Before the 3.4.0 release, Spark only supports the TIMESTAMP WITH " +
@@ -4848,18 +4839,6 @@ class SQLConf extends Serializable with Logging {
       TimestampNTZType
   }
 
-  def inferTimestampNTZInDataSources: Boolean = getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)
-
-  // Preferred timestamp type in schema reference when a column can be either Timestamp type or
-  // TimestampNTZ type.
-  def timestampTypeInSchemaInference: AtomicType = {
-    if (getConf(INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES)) {
-      TimestampNTZType
-    } else {
-      TimestampType
-    }
-  }
-
   def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)
 
   def serializerNestedSchemaPruningEnabled: Boolean =
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index fc508d9d09c..acedf7998c2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -249,10 +249,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
     // inferField should infer a column as string type if it contains mixing dates and timestamps
     assert(inferSchema.inferField(DateType, "2003|01|01") == StringType)
     // SQL configuration must be set to default to TimestampNTZ
-    withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
-      assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
-    }
-    withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+    withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
       assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
     }
     assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 90c45fd11dd..b3d68003be3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -490,8 +490,8 @@ object PartitioningUtils extends SQLConfHelper {
       val unescapedRaw = unescapePathName(raw)
       // try and parse the date, if no exception occurs this is a candidate to be resolved as
       // TimestampType or TimestampNTZType. The inference timestamp typ is controlled by the conf
-      // "spark.sql.sources.timestampNTZTypeInference.enabled".
-      val timestampType = conf.timestampTypeInSchemaInference
+      // "spark.sql.timestampType".
+      val timestampType = conf.timestampType
       timestampType match {
         case TimestampType => timestampFormatter.parse(unescapedRaw)
         case TimestampNTZType => timestampFormatter.parseWithoutTimeZone(unescapedRaw)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 888951cf9a8..916ed99303b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.TimestampNTZType
 
 /**
  * Options for the JDBC data source.
@@ -237,7 +238,7 @@ class JDBCOptions(
     parameters
       .get(JDBC_INFER_TIMESTAMP_NTZ)
       .map(_.toBoolean)
-      .getOrElse(SQLConf.get.inferTimestampNTZInDataSources)
+      .getOrElse(SQLConf.get.timestampType == TimestampNTZType)
 }
 
 class JdbcOptionsInWrite(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 4cc971a05af..b6cc9ef1cab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1047,7 +1047,7 @@ abstract class CSVSuite
         .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .save(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
         val res = spark.read
           .format("csv")
           .option("inferSchema", "true")
@@ -1070,7 +1070,7 @@ abstract class CSVSuite
         .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .save(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") {
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_LTZ") {
         val res = spark.read
           .format("csv")
           .option("inferSchema", "true")
@@ -1117,15 +1117,15 @@ abstract class CSVSuite
         SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
         SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
 
-      Seq(true, false).foreach { inferTimestampNTZ =>
-        withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+      timestampTypes.foreach { timestampType =>
+        withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
           val res = spark.read
             .format("csv")
             .option("inferSchema", "true")
             .option("header", "true")
             .load(path.getAbsolutePath)
 
-          if (inferTimestampNTZ) {
+          if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
             checkAnswer(res, exp)
           } else {
             checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 5595a9670ac..1f9a2da5dd7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2782,7 +2782,7 @@ abstract class JsonSuite
         .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .json(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
         val res = spark.read
           .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
           .option("inferTimestamp", "true")
@@ -2804,7 +2804,7 @@ abstract class JsonSuite
         .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
         .json(path.getAbsolutePath)
 
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "false") {
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
         val res = spark.read
           .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
           .option("inferTimestamp", "true")
@@ -2847,11 +2847,11 @@ abstract class JsonSuite
         SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
         SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
 
-      Seq(true, false).foreach { inferTimestampNTZ =>
-        withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+      timestampTypes.foreach { timestampType =>
+        withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
           val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath)
 
-          if (inferTimestampNTZ) {
+          if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
             checkAnswer(res, exp)
           } else {
             checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index d91320bee7e..43626237b13 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -82,16 +82,21 @@ abstract class ParquetPartitionDiscoverySuite
     check("1.5", DoubleType)
     check("hello", StringType)
     check("1990-02-24", DateType)
-    // The inferred timestamp type is controlled by `SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
-    Seq(false, true).foreach { inferTimestampNTZ =>
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
-        val timestampType = if (inferTimestampNTZ) {
-          TimestampNTZType
-        } else {
-          TimestampType
-        }
-        check("1990-02-24 12:00:30", timestampType)
-        check("1990-02-24 12:00:30", timestampType, ZoneOffset.UTC)
+    // The inferred timestamp type is controlled by `SQLConf.TIMESTAMP_TYPE`
+    val timestampTypes = Seq(
+      SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
+      SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
+
+    timestampTypes.foreach { timestampType =>
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+        val expectedTimestampType =
+          if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+            TimestampNTZType
+          } else {
+            TimestampType
+          }
+        check("1990-02-24 12:00:30", expectedTimestampType)
+        check("1990-02-24 12:00:30", expectedTimestampType, ZoneOffset.UTC)
       }
     }
 
@@ -372,9 +377,14 @@ abstract class ParquetPartitionDiscoverySuite
       s"hdfs://host:9000/path2"),
       PartitionSpec.emptySpec)
 
-    // The inferred timestamp type is controlled by `SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
-    Seq(false, true).foreach { inferTimestampNTZ =>
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+    // The inferred timestamp type is controlled by `SQLConf.TIMESTAMP_TYPE`
+    val timestampTypes = Seq(
+      SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
+      SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
+
+    timestampTypes.foreach { timestampType =>
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+        val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
         // The cases below check the resolution for type conflicts.
         val t1 = if (!inferTimestampNTZ) {
           Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
@@ -396,7 +406,7 @@ abstract class ParquetPartitionDiscoverySuite
           s"hdfs://host:9000/path/a=2014-01-01 00%3A01%3A00.0/b=$defaultPartitionName"),
           PartitionSpec(
             StructType(Seq(
-              StructField("a", SQLConf.get.timestampTypeInSchemaInference),
+              StructField("a", SQLConf.get.timestampType),
               StructField("b", DecimalType(22, 0)))),
             Seq(
               Partition(
@@ -661,9 +671,14 @@ abstract class ParquetPartitionDiscoverySuite
   }
 
   test("Various partition value types") {
-    // The inferred timestamp type is controlled by `SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES`
-    Seq(false, true).foreach { inferTimestampNTZ =>
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+    // The inferred timestamp type is controlled by `SQLConf.TIMESTAMP_TYPE`
+    val timestampTypes = Seq(
+      SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
+      SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
+
+    timestampTypes.foreach { timestampType =>
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+        val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
         val ts = if (!inferTimestampNTZ) {
           new Timestamp(0)
         } else {
@@ -696,7 +711,7 @@ abstract class ParquetPartitionDiscoverySuite
             DecimalType(10, 5),
             DecimalType.SYSTEM_DEFAULT,
             DateType,
-            SQLConf.get.timestampTypeInSchemaInference,
+            SQLConf.get.timestampType,
             StringType)
 
         val partitionColumns = partitionColumnTypes.zipWithIndex.map {
@@ -727,8 +742,13 @@ abstract class ParquetPartitionDiscoverySuite
   }
 
   test("Various inferred partition value types") {
-    Seq(false, true).foreach { inferTimestampNTZ =>
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+    val timestampTypes = Seq(
+      SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
+      SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
+
+    timestampTypes.foreach { timestampType =>
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+        val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
         val ts = if (!inferTimestampNTZ) {
           Timestamp.valueOf("1990-02-24 12:00:30")
         } else {
@@ -750,7 +770,7 @@ abstract class ParquetPartitionDiscoverySuite
             DoubleType,
             DecimalType(20, 0),
             DateType,
-            SQLConf.get.timestampTypeInSchemaInference,
+            SQLConf.get.timestampType,
             StringType)
 
         val partitionColumns = partitionColumnTypes.zipWithIndex.map {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 3b3b1bfdb60..975c2886948 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1940,7 +1940,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       .option("url", urlWithUserAndPass)
       .option("dbtable", tableName)
 
-    Seq(true, false).foreach { inferTimestampNTZ =>
+    val timestampTypes = Seq(
+      SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
+      SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
+
+    timestampTypes.foreach { timestampType =>
+      val inferTimestampNTZ = timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString
       val tsType = if (inferTimestampNTZ) {
         TimestampNTZType
       } else {
@@ -1949,13 +1954,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load()
       checkAnswer(res, Seq(Row(null)))
       assert(res.schema.fields.head.dataType == tsType)
-      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
         val res2 = readDf.load()
         checkAnswer(res2, Seq(Row(null)))
         assert(res2.schema.fields.head.dataType == tsType)
       }
     }
-
   }
 
   test("SPARK-39339: TimestampNTZType with different local time zones") {
@@ -1986,7 +1990,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
               .load()
             checkAnswer(res, df)
 
-            withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+            withSQLConf(
+              SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
               val res2 = spark.read.format("jdbc")
                 .option("url", urlWithUserAndPass)
                 .option("dbtable", tableName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org