You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2024/03/06 03:22:34 UTC

(spark) branch master updated: [SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d7cc708e1e0d [SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE
d7cc708e1e0d is described below

commit d7cc708e1e0dbda93f07832ae5e18cbc075f6431
Author: Kent Yao <ya...@apache.org>
AuthorDate: Wed Mar 6 11:22:22 2024 +0800

    [SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE
    
    ### What changes were proposed in this pull request?
    
    As illustrated by the Oracle [Documentation](https://docs.oracle.com/en/database/oracle/oracle-database/23/jjdbc/Oracle-extensions.html#GUID-DB1F687A-CF1C-4B3F-92C0-126DC782EF53):
    
    > TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE types can be represented
    as standard java.sql.Timestamp type.
    The byte representation of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE
    types to java.sql.Timestamp is straight forward.
    This is because the internal format of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL
    TIME ZONE data types is GMT, and java.sql.Timestamp type objects internally use a
    milliseconds time value that is the number of milliseconds since EPOCH.
    
    As we use `rs.getTimestamp` instead of `rs.getString` or `rs.getTIMESTAMPTZ` to retrieve the timestamp value, it's safe to remove the `spark.sql.session.timeZone` guard when mapping catalyst type to oracle/jdbc types.
    
    ### Why are the changes needed?
    
    Improve the functionality of the Oracle connector
    
    ### Does this PR introduce _any_ user-facing change?
    
    When reading Oracle TIMESTAMP WITH TIMEZONE, the `spark.sql.session.timeZone` can be changed without restriction for equivalence of JVM default.
    
    ### How was this patch tested?
    
    new unit tests added
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45384 from yaooqinn/SPARK-47280.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../sql/jdbc/DockerJDBCIntegrationSuite.scala      |  3 +-
 .../spark/sql/jdbc/OracleIntegrationSuite.scala    | 39 ++++++++--------------
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  | 24 +++++--------
 3 files changed, 25 insertions(+), 41 deletions(-)

diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 5df7e00c9c3c..13db5844c604 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -36,6 +36,7 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
 import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.{DockerUtils, Utils}
 import org.apache.spark.util.Utils.timeStringAsSeconds
@@ -99,7 +100,7 @@ abstract class DatabaseOnDocker {
 }
 
 abstract class DockerJDBCIntegrationSuite
-  extends SharedSparkSession with Eventually with DockerIntegrationFunSuite {
+  extends QueryTest with SharedSparkSession with Eventually with DockerIntegrationFunSuite {
 
   protected val dockerIp = DockerUtils.getDockerIp()
   val db: DatabaseOnDocker
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 5ed0a2a1a7df..3b6e7faa164e 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -23,7 +23,6 @@ import java.util.{Properties, TimeZone}
 
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkSQLException
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec}
@@ -69,6 +68,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
 
   override val connectionTimeout = timeout(7.minutes)
 
+  private val rsOfTsWithTimezone = Seq(
+    Row(BigDecimal.valueOf(1), new Timestamp(944046000000L)),
+    Row(BigDecimal.valueOf(2), new Timestamp(944078400000L))
+  )
+
   override def dataPreparation(conn: Connection): Unit = {
     // In 18.4.0 Express Edition auto commit is enabled by default.
     conn.setAutoCommit(false)
@@ -275,7 +279,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
     assert(types(1).equals("class java.sql.Timestamp"))
   }
 
-  test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") {
+  test("SPARK-47280: Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE") {
     val defaultJVMTimeZone = TimeZone.getDefault
     // Pick the timezone different from the current default time zone of JVM
     val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
@@ -284,35 +288,20 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
       if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone
 
     withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) {
-      checkError(
-        exception = intercept[SparkSQLException] {
-          sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties).collect()
-        },
-        errorClass = "UNRECOGNIZED_SQL_TYPE",
-        parameters = Map("typeName" -> "TIMESTAMP WITH TIME ZONE", "jdbcType" -> "-101"))
+      checkAnswer(
+        sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties),
+        rsOfTsWithTimezone)
     }
   }
 
   test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
-    def checkRow(row: Row, ts: String): Unit = {
-      assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
-    }
-
     withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> TimeZone.getDefault.getID) {
       val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
-      withDefaultTimeZone(PST) {
-        assert(dfRead.collect().toSet ===
-          Set(
-            Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 03:00:00")),
-            Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 12:00:00"))))
-      }
-
-      withDefaultTimeZone(UTC) {
-        assert(dfRead.collect().toSet ===
-          Set(
-            Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 11:00:00")),
-            Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 20:00:00"))))
-      }
+      Seq(PST, UTC).foreach(timeZone => {
+        withDefaultTimeZone(timeZone) {
+          checkAnswer(dfRead, rsOfTsWithTimezone)
+        }
+      })
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 05c927416e7b..544c0197dec9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -18,15 +18,13 @@
 package org.apache.spark.sql.jdbc
 
 import java.sql.{Date, Timestamp, Types}
-import java.util.{Locale, TimeZone}
+import java.util.Locale
 
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 
@@ -80,13 +78,6 @@ private case object OracleDialect extends JdbcDialect {
     }
   }
 
-  private def supportTimeZoneTypes: Boolean = {
-    val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
-    // TODO: support timezone types when users are not using the JVM timezone, which
-    // is the default value of SESSION_LOCAL_TIMEZONE
-    timeZone == TimeZone.getDefault
-  }
-
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
     sqlType match {
@@ -107,11 +98,14 @@ private case object OracleDialect extends JdbcDialect {
           case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
           case _ => None
         }
-      case TIMESTAMP_TZ if supportTimeZoneTypes =>
-        // Value for Timestamp with Time Zone in Oracle
-        Some(TimestampType)
-      case TIMESTAMP_LTZ =>
-        // Value for Timestamp with Local Time Zone in Oracle
+      case TIMESTAMP_TZ | TIMESTAMP_LTZ =>
+        // TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE types can be represented
+        // as standard java.sql.Timestamp type.
+        // The byte representation of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE
+        // types to java.sql.Timestamp is straight forward.
+        // This is because the internal format of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL
+        // TIME ZONE data types is GMT, and java.sql.Timestamp type objects internally use a
+        // milliseconds time value that is the number of milliseconds since EPOCH.
         Some(TimestampType)
       case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
       case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org