You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/30 20:55:23 UTC
[spark] branch master updated: [SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ede4b7da693 [SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect
ede4b7da693 is described below
commit ede4b7da6932c756f26a93eb1006819366fdbb70
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Jun 30 13:55:04 2022 -0700
[SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/36726 supports TimestampNTZ type in JDBC data source.
But the implement is incorrect.
This PR just modify a test case and it will be failed !
The test case show below.
```
test("SPARK-39339: TimestampNTZType with different local time zones") {
val tableName = "timestamp_ntz_diff_tz_support_table"
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
Seq(
"1972-07-04 03:30:00",
"2019-01-20 12:00:00.502",
"2019-01-20T00:00:00.123456",
"1500-01-20T00:00:00.123456"
).foreach { case datetime =>
val df = spark.sql(s"select timestamp_ntz '$datetime'")
df.write.format("jdbc")
.mode("overwrite")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.save()
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
val res = spark.read.format("jdbc")
.option("inferTimestampNTZType", "true")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName)
.load()
checkAnswer(res, df)
}
}
}
}
}
}
```
The test case output failure show below.
```
Results do not match for query:
Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null]
Timezone Env:
== Parsed Logical Plan ==
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]
== Analyzed Logical Plan ==
TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]
== Optimized Logical Plan ==
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]
== Physical Plan ==
*(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] PushedFilters: [], ReadSchema: struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>
== Results ==
== Results ==
!== Correct Answer - 1 == == Spark Answer - 1 ==
struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz> struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>
![1500-01-20T00:00:00.123456] [1500-01-20T00:16:08.123456]
ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243)
org.scalatest.exceptions.TestFailedException:
```
### Why are the changes needed?
Fix an implement bug.
The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`.
`toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New test case.
Closes #37013 from beliefer/SPARK-39339_followup.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../spark/sql/catalyst/util/DateTimeUtils.scala | 32 ++++++++++++++++------
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 26 ++++++++++--------
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 18 +++++++-----
3 files changed, 49 insertions(+), 27 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index cc61491dc95..5045d1479f2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -158,11 +158,19 @@ object DateTimeUtils {
* @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
* @return A `java.sql.Timestamp` from number of micros since epoch.
*/
- def toJavaTimestamp(micros: Long): Timestamp = {
- val rebasedMicros = rebaseGregorianToJulianMicros(micros)
- val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND)
+ def toJavaTimestamp(micros: Long): Timestamp =
+ toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros))
+
+ /**
+ * Converts microseconds since the epoch to an instance of `java.sql.Timestamp`.
+ *
+ * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
+ * @return A `java.sql.Timestamp` from number of micros since epoch.
+ */
+ def toJavaTimestampNoRebase(micros: Long): Timestamp = {
+ val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
val ts = new Timestamp(seconds * MILLIS_PER_SECOND)
- val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
+ val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
ts.setNanos(nanos.toInt)
ts
}
@@ -186,10 +194,18 @@ object DateTimeUtils {
* Gregorian calendars.
* @return The number of micros since epoch from `java.sql.Timestamp`.
*/
- def fromJavaTimestamp(t: Timestamp): Long = {
- val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
- rebaseJulianToGregorianMicros(micros)
- }
+ def fromJavaTimestamp(t: Timestamp): Long =
+ rebaseJulianToGregorianMicros(fromJavaTimestampNoRebase(t))
+
+ /**
+ * Converts an instance of `java.sql.Timestamp` to the number of microseconds since
+ * 1970-01-01T00:00:00.000000Z.
+ *
+ * @param t an instance of `java.sql.Timestamp`.
+ * @return The number of micros since epoch from `java.sql.Timestamp`.
+ */
+ def fromJavaTimestampNoRebase(t: Timestamp): Long =
+ millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
/**
* Converts an Java object to microseconds.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index cc8746ea5c4..fa4c032fcb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
@@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}
- case TimestampType | TimestampNTZType =>
+ case TimestampType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
val t = rs.getTimestamp(pos + 1)
if (t != null) {
@@ -482,6 +482,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
row.update(pos, null)
}
+ case TimestampNTZType =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ val t = rs.getTimestamp(pos + 1)
+ if (t != null) {
+ row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t))
+ } else {
+ row.update(pos, null)
+ }
+
case BinaryType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
@@ -594,16 +603,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
case TimestampNTZType =>
- if (conf.datetimeJava8ApiEnabled) {
- (stmt: PreparedStatement, row: Row, pos: Int) =>
- stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
- } else {
- (stmt: PreparedStatement, row: Row, pos: Int) =>
- stmt.setTimestamp(
- pos + 1,
- toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)))
- )
- }
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))
+ stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros))
case DateType =>
if (conf.datetimeJava8ApiEnabled) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9c28e204296..494ae6d5487 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1950,13 +1950,17 @@ class JDBCSuite extends QueryTest
.option("dbtable", tableName)
.save()
- val res = spark.read.format("jdbc")
- .option("inferTimestampNTZType", "true")
- .option("url", urlWithUserAndPass)
- .option("dbtable", tableName)
- .load()
-
- checkAnswer(res, df)
+ DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+ DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+ val res = spark.read.format("jdbc")
+ .option("inferTimestampNTZType", "true")
+ .option("url", urlWithUserAndPass)
+ .option("dbtable", tableName)
+ .load()
+
+ checkAnswer(res, df)
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org