You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/03 18:06:38 UTC

[spark] branch master updated: [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4760a8bd845 [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source
4760a8bd845 is described below

commit 4760a8bd845292f7d6d6a35320bd80082a76c7c5
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Fri Feb 3 10:06:26 2023 -0800

    [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source
    
    ### What changes were proposed in this pull request?
    
    Simliar to https://github.com/apache/spark/pull/39777 and https://github.com/apache/spark/pull/39812, this PR proposes to use `spark.sql.inferTimestampNTZInDataSources.enabled` to control the behavior of timestamp type inference on JDBC data sources.
    
    ### Why are the changes needed?
    
    Unify the TimestampNTZ type inference behavior over data sources. In JDBC/JSON/CSV data sources, a column can be Timestamp type or TimestampNTZ type. We need a lightweight configuration to control the behavior.
    ### Does this PR introduce _any_ user-facing change?
    
    No, TimestampNTZ is not released yet.
    
    ### How was this patch tested?
    
    UTs
    
    Closes #39868 from gengliangwang/jdbcNTZ.
    
    Authored-by: Gengliang Wang <ge...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  9 ++---
 .../execution/datasources/jdbc/JDBCOptions.scala   |  7 +++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 43 ++++++++++++++++------
 3 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1cc3b61b834..363e763be4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3520,11 +3520,10 @@ object SQLConf {
 
   val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
     buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
-      .doc("When true, the TimestampNTZ type is the prior choice of the schema inference " +
-        "over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " +
-        "backward compatibility. As a result, for JSON/CSV files and partition directories  " +
-        "written with TimestampNTZ columns, the inference results will still be of TimestampLTZ " +
-        "types.")
+      .doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " +
+        "this config determines whether to choose the TimestampNTZ type if a column can be " +
+        "either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " +
+        "the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.")
       .version("3.4.0")
       .booleanConf
       .createWithDefault(false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index e725de95335..888951cf9a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Options for the JDBC data source.
@@ -232,7 +233,11 @@ class JDBCOptions(
   val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("")
 
   // Infers timestamp values as TimestampNTZ type when reading data.
-  val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean
+  val inferTimestampNTZType =
+    parameters
+      .get(JDBC_INFER_TIMESTAMP_NTZ)
+      .map(_.toBoolean)
+      .getOrElse(SQLConf.get.inferTimestampNTZInDataSources)
 }
 
 class JdbcOptionsInWrite(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 3e317dc9547..3b3b1bfdb60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime}
 import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
@@ -1935,13 +1936,26 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       .option("url", urlWithUserAndPass)
       .option("dbtable", tableName).save()
 
-    val res = spark.read.format("jdbc")
-      .option("inferTimestampNTZType", "true")
+    val readDf = spark.read.format("jdbc")
       .option("url", urlWithUserAndPass)
       .option("dbtable", tableName)
-      .load()
 
-    checkAnswer(res, Seq(Row(null)))
+    Seq(true, false).foreach { inferTimestampNTZ =>
+      val tsType = if (inferTimestampNTZ) {
+        TimestampNTZType
+      } else {
+        TimestampType
+      }
+      val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load()
+      checkAnswer(res, Seq(Row(null)))
+      assert(res.schema.fields.head.dataType == tsType)
+      withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+        val res2 = readDf.load()
+        checkAnswer(res2, Seq(Row(null)))
+        assert(res2.schema.fields.head.dataType == tsType)
+      }
+    }
+
   }
 
   test("SPARK-39339: TimestampNTZType with different local time zones") {
@@ -1961,16 +1975,23 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
             .option("url", urlWithUserAndPass)
             .option("dbtable", tableName)
             .save()
-
-          DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
-            DateTimeTestUtils.withDefaultTimeZone(zoneId) {
-              val res = spark.read.format("jdbc")
-                .option("inferTimestampNTZType", "true")
+          val zoneId = DateTimeTestUtils.outstandingZoneIds(
+          Random.nextInt(DateTimeTestUtils.outstandingZoneIds.length))
+          DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+            // Infer TimestmapNTZ column with data source option
+            val res = spark.read.format("jdbc")
+              .option("inferTimestampNTZType", "true")
+              .option("url", urlWithUserAndPass)
+              .option("dbtable", tableName)
+              .load()
+            checkAnswer(res, df)
+
+            withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+              val res2 = spark.read.format("jdbc")
                 .option("url", urlWithUserAndPass)
                 .option("dbtable", tableName)
                 .load()
-
-              checkAnswer(res, df)
+              checkAnswer(res2, df)
             }
           }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org