You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/03 18:06:38 UTC
[spark] branch master updated: [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4760a8bd845 [SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source
4760a8bd845 is described below
commit 4760a8bd845292f7d6d6a35320bd80082a76c7c5
Author: Gengliang Wang <ge...@apache.org>
AuthorDate: Fri Feb 3 10:06:26 2023 -0800
[SPARK-42296][SQL] Apply spark.sql.inferTimestampNTZInDataSources.enabled on JDBC data source
### What changes were proposed in this pull request?
Simliar to https://github.com/apache/spark/pull/39777 and https://github.com/apache/spark/pull/39812, this PR proposes to use `spark.sql.inferTimestampNTZInDataSources.enabled` to control the behavior of timestamp type inference on JDBC data sources.
### Why are the changes needed?
Unify the TimestampNTZ type inference behavior over data sources. In JDBC/JSON/CSV data sources, a column can be Timestamp type or TimestampNTZ type. We need a lightweight configuration to control the behavior.
### Does this PR introduce _any_ user-facing change?
No, TimestampNTZ is not released yet.
### How was this patch tested?
UTs
Closes #39868 from gengliangwang/jdbcNTZ.
Authored-by: Gengliang Wang <ge...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++---
.../execution/datasources/jdbc/JDBCOptions.scala | 7 +++-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 43 ++++++++++++++++------
3 files changed, 42 insertions(+), 17 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1cc3b61b834..363e763be4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3520,11 +3520,10 @@ object SQLConf {
val INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES =
buildConf("spark.sql.inferTimestampNTZInDataSources.enabled")
- .doc("When true, the TimestampNTZ type is the prior choice of the schema inference " +
- "over built-in data sources. Otherwise, the inference result will be TimestampLTZ for " +
- "backward compatibility. As a result, for JSON/CSV files and partition directories " +
- "written with TimestampNTZ columns, the inference results will still be of TimestampLTZ " +
- "types.")
+ .doc("For the schema inference of JSON/CSV/JDBC data sources and partition directories, " +
+ "this config determines whether to choose the TimestampNTZ type if a column can be " +
+ "either TimestampNTZ or TimestampLTZ type. If set to true, the inference result of " +
+ "the column will be TimestampNTZ type. Otherwise, the result will be TimestampLTZ type.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index e725de95335..888951cf9a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkFiles
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
/**
* Options for the JDBC data source.
@@ -232,7 +233,11 @@ class JDBCOptions(
val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " ").getOrElse("")
// Infers timestamp values as TimestampNTZ type when reading data.
- val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, "false").toBoolean
+ val inferTimestampNTZType =
+ parameters
+ .get(JDBC_INFER_TIMESTAMP_NTZ)
+ .map(_.toBoolean)
+ .getOrElse(SQLConf.get.inferTimestampNTZInDataSources)
}
class JdbcOptionsInWrite(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 3e317dc9547..3b3b1bfdb60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -23,6 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime}
import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}
import scala.collection.JavaConverters._
+import scala.util.Random
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
@@ -1935,13 +1936,26 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName).save()
- val res = spark.read.format("jdbc")
- .option("inferTimestampNTZType", "true")
+ val readDf = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
- .load()
- checkAnswer(res, Seq(Row(null)))
+ Seq(true, false).foreach { inferTimestampNTZ =>
+ val tsType = if (inferTimestampNTZ) {
+ TimestampNTZType
+ } else {
+ TimestampType
+ }
+ val res = readDf.option("inferTimestampNTZType", inferTimestampNTZ).load()
+ checkAnswer(res, Seq(Row(null)))
+ assert(res.schema.fields.head.dataType == tsType)
+ withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> inferTimestampNTZ.toString) {
+ val res2 = readDf.load()
+ checkAnswer(res2, Seq(Row(null)))
+ assert(res2.schema.fields.head.dataType == tsType)
+ }
+ }
+
}
test("SPARK-39339: TimestampNTZType with different local time zones") {
@@ -1961,16 +1975,23 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.save()
-
- DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
- DateTimeTestUtils.withDefaultTimeZone(zoneId) {
- val res = spark.read.format("jdbc")
- .option("inferTimestampNTZType", "true")
+ val zoneId = DateTimeTestUtils.outstandingZoneIds(
+ Random.nextInt(DateTimeTestUtils.outstandingZoneIds.length))
+ DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+ // Infer TimestmapNTZ column with data source option
+ val res = spark.read.format("jdbc")
+ .option("inferTimestampNTZType", "true")
+ .option("url", urlWithUserAndPass)
+ .option("dbtable", tableName)
+ .load()
+ checkAnswer(res, df)
+
+ withSQLConf(SQLConf.INFER_TIMESTAMP_NTZ_IN_DATA_SOURCES.key -> "true") {
+ val res2 = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()
-
- checkAnswer(res, df)
+ checkAnswer(res2, df)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org