You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/30 20:55:23 UTC

[spark] branch master updated: [SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ede4b7da693 [SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect
ede4b7da693 is described below

commit ede4b7da6932c756f26a93eb1006819366fdbb70
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Jun 30 13:55:04 2022 -0700

    [SPARK-39339][SQL][FOLLOWUP] Fix bug TimestampNTZ type in JDBC data source is incorrect
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/36726 supports TimestampNTZ type in JDBC data source.
    But the implement is incorrect.
    This PR just modify a test case and it will be failed !
    The test case show below.
    ```
      test("SPARK-39339: TimestampNTZType with different local time zones") {
        val tableName = "timestamp_ntz_diff_tz_support_table"
    
        DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
          DateTimeTestUtils.withDefaultTimeZone(zoneId) {
            Seq(
              "1972-07-04 03:30:00",
              "2019-01-20 12:00:00.502",
              "2019-01-20T00:00:00.123456",
              "1500-01-20T00:00:00.123456"
            ).foreach { case datetime =>
              val df = spark.sql(s"select timestamp_ntz '$datetime'")
              df.write.format("jdbc")
                .mode("overwrite")
                .option("url", urlWithUserAndPass)
                .option("dbtable", tableName)
                .save()
    
              DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
                DateTimeTestUtils.withDefaultTimeZone(zoneId) {
                  val res = spark.read.format("jdbc")
                    .option("inferTimestampNTZType", "true")
                    .option("url", urlWithUserAndPass)
                    .option("dbtable", tableName)
                    .load()
    
                  checkAnswer(res, df)
                }
              }
            }
          }
        }
      }
    ```
    
    The test case output failure show below.
    ```
    Results do not match for query:
    Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null]
    Timezone Env:
    
    == Parsed Logical Plan ==
    Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]
    
    == Analyzed Logical Plan ==
    TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz
    Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]
    
    == Optimized Logical Plan ==
    Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]
    
    == Physical Plan ==
    *(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'#253] PushedFilters: [], ReadSchema: struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>
    
    == Results ==
    
    == Results ==
    !== Correct Answer - 1 ==                                           == Spark Answer - 1 ==
     struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>   struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>
    ![1500-01-20T00:00:00.123456]                                       [1500-01-20T00:16:08.123456]
    
    ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243)
    org.scalatest.exceptions.TestFailedException:
    ```
    
    ### Why are the changes needed?
    Fix an implement bug.
    The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`.
    `toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test case.
    
    Closes #37013 from beliefer/SPARK-39339_followup.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 32 ++++++++++++++++------
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 26 ++++++++++--------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 18 +++++++-----
 3 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index cc61491dc95..5045d1479f2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -158,11 +158,19 @@ object DateTimeUtils {
    * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
    * @return A `java.sql.Timestamp` from number of micros since epoch.
    */
-  def toJavaTimestamp(micros: Long): Timestamp = {
-    val rebasedMicros = rebaseGregorianToJulianMicros(micros)
-    val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND)
+  def toJavaTimestamp(micros: Long): Timestamp =
+    toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros))
+
+  /**
+   * Converts microseconds since the epoch to an instance of `java.sql.Timestamp`.
+   *
+   * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z.
+   * @return A `java.sql.Timestamp` from number of micros since epoch.
+   */
+  def toJavaTimestampNoRebase(micros: Long): Timestamp = {
+    val seconds = Math.floorDiv(micros, MICROS_PER_SECOND)
     val ts = new Timestamp(seconds * MILLIS_PER_SECOND)
-    val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
+    val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS
     ts.setNanos(nanos.toInt)
     ts
   }
@@ -186,10 +194,18 @@ object DateTimeUtils {
    *          Gregorian calendars.
    * @return The number of micros since epoch from `java.sql.Timestamp`.
    */
-  def fromJavaTimestamp(t: Timestamp): Long = {
-    val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
-    rebaseJulianToGregorianMicros(micros)
-  }
+  def fromJavaTimestamp(t: Timestamp): Long =
+    rebaseJulianToGregorianMicros(fromJavaTimestampNoRebase(t))
+
+  /**
+   * Converts an instance of `java.sql.Timestamp` to the number of microseconds since
+   * 1970-01-01T00:00:00.000000Z.
+   *
+   * @param t an instance of `java.sql.Timestamp`.
+   * @return The number of micros since epoch from `java.sql.Timestamp`.
+   */
+  def fromJavaTimestampNoRebase(t: Timestamp): Long =
+    millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS
 
   /**
    * Converts an Java object to microseconds.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index cc8746ea5c4..fa4c032fcb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
 import org.apache.spark.sql.connector.expressions.NamedReference
@@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
         }
       }
 
-    case TimestampType | TimestampNTZType =>
+    case TimestampType =>
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         val t = rs.getTimestamp(pos + 1)
         if (t != null) {
@@ -482,6 +482,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
           row.update(pos, null)
         }
 
+    case TimestampNTZType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val t = rs.getTimestamp(pos + 1)
+        if (t != null) {
+          row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t))
+        } else {
+          row.update(pos, null)
+        }
+
     case BinaryType =>
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.update(pos, rs.getBytes(pos + 1))
@@ -594,16 +603,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
       }
 
     case TimestampNTZType =>
-      if (conf.datetimeJava8ApiEnabled) {
-        (stmt: PreparedStatement, row: Row, pos: Int) =>
-          stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
-      } else {
-        (stmt: PreparedStatement, row: Row, pos: Int) =>
-          stmt.setTimestamp(
-            pos + 1,
-            toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)))
-          )
-      }
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))
+        stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros))
 
     case DateType =>
       if (conf.datetimeJava8ApiEnabled) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9c28e204296..494ae6d5487 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1950,13 +1950,17 @@ class JDBCSuite extends QueryTest
             .option("dbtable", tableName)
             .save()
 
-          val res = spark.read.format("jdbc")
-            .option("inferTimestampNTZType", "true")
-            .option("url", urlWithUserAndPass)
-            .option("dbtable", tableName)
-            .load()
-
-          checkAnswer(res, df)
+          DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+            DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+              val res = spark.read.format("jdbc")
+                .option("inferTimestampNTZType", "true")
+                .option("url", urlWithUserAndPass)
+                .option("dbtable", tableName)
+                .load()
+
+              checkAnswer(res, df)
+            }
+          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org