You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2024/03/06 03:22:34 UTC
(spark) branch master updated: [SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d7cc708e1e0d [SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE
d7cc708e1e0d is described below
commit d7cc708e1e0dbda93f07832ae5e18cbc075f6431
Author: Kent Yao <ya...@apache.org>
AuthorDate: Wed Mar 6 11:22:22 2024 +0800
[SPARK-47280][SQL] Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE
### What changes were proposed in this pull request?
As illustrated by the Oracle [Documentation](https://docs.oracle.com/en/database/oracle/oracle-database/23/jjdbc/Oracle-extensions.html#GUID-DB1F687A-CF1C-4B3F-92C0-126DC782EF53):
> TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE types can be represented
as standard java.sql.Timestamp type.
The byte representation of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE
types to java.sql.Timestamp is straight forward.
This is because the internal format of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL
TIME ZONE data types is GMT, and java.sql.Timestamp type objects internally use a
milliseconds time value that is the number of milliseconds since EPOCH.
As we use `rs.getTimestamp` instead of `rs.getString` or `rs.getTIMESTAMPTZ` to retrieve the timestamp value, it's safe to remove the `spark.sql.session.timeZone` guard when mapping catalyst type to oracle/jdbc types.
### Why are the changes needed?
Improve the functionality of the Oracle connector
### Does this PR introduce _any_ user-facing change?
When reading Oracle TIMESTAMP WITH TIMEZONE, the `spark.sql.session.timeZone` can be changed without restriction for equivalence of JVM default.
### How was this patch tested?
new unit tests added
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45384 from yaooqinn/SPARK-47280.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../sql/jdbc/DockerJDBCIntegrationSuite.scala | 3 +-
.../spark/sql/jdbc/OracleIntegrationSuite.scala | 39 ++++++++--------------
.../org/apache/spark/sql/jdbc/OracleDialect.scala | 24 +++++--------
3 files changed, 25 insertions(+), 41 deletions(-)
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 5df7e00c9c3c..13db5844c604 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -36,6 +36,7 @@ import com.github.dockerjava.zerodep.ZerodepDockerHttpClient
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.SpanSugar._
+import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.{DockerUtils, Utils}
import org.apache.spark.util.Utils.timeStringAsSeconds
@@ -99,7 +100,7 @@ abstract class DatabaseOnDocker {
}
abstract class DockerJDBCIntegrationSuite
- extends SharedSparkSession with Eventually with DockerIntegrationFunSuite {
+ extends QueryTest with SharedSparkSession with Eventually with DockerIntegrationFunSuite {
protected val dockerIp = DockerUtils.getDockerIp()
val db: DatabaseOnDocker
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 5ed0a2a1a7df..3b6e7faa164e 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -23,7 +23,6 @@ import java.util.{Properties, TimeZone}
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkSQLException
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec}
@@ -69,6 +68,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
override val connectionTimeout = timeout(7.minutes)
+ private val rsOfTsWithTimezone = Seq(
+ Row(BigDecimal.valueOf(1), new Timestamp(944046000000L)),
+ Row(BigDecimal.valueOf(2), new Timestamp(944078400000L))
+ )
+
override def dataPreparation(conn: Connection): Unit = {
// In 18.4.0 Express Edition auto commit is enabled by default.
conn.setAutoCommit(false)
@@ -275,7 +279,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
assert(types(1).equals("class java.sql.Timestamp"))
}
- test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") {
+ test("SPARK-47280: Remove timezone limitation for ORACLE TIMESTAMP WITH TIMEZONE") {
val defaultJVMTimeZone = TimeZone.getDefault
// Pick the timezone different from the current default time zone of JVM
val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
@@ -284,35 +288,20 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) {
- checkError(
- exception = intercept[SparkSQLException] {
- sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties).collect()
- },
- errorClass = "UNRECOGNIZED_SQL_TYPE",
- parameters = Map("typeName" -> "TIMESTAMP WITH TIME ZONE", "jdbcType" -> "-101"))
+ checkAnswer(
+ sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties),
+ rsOfTsWithTimezone)
}
}
test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
- def checkRow(row: Row, ts: String): Unit = {
- assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
- }
-
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> TimeZone.getDefault.getID) {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
- withDefaultTimeZone(PST) {
- assert(dfRead.collect().toSet ===
- Set(
- Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 03:00:00")),
- Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 12:00:00"))))
- }
-
- withDefaultTimeZone(UTC) {
- assert(dfRead.collect().toSet ===
- Set(
- Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 11:00:00")),
- Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 20:00:00"))))
- }
+ Seq(PST, UTC).foreach(timeZone => {
+ withDefaultTimeZone(timeZone) {
+ checkAnswer(dfRead, rsOfTsWithTimezone)
+ }
+ })
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 05c927416e7b..544c0197dec9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -18,15 +18,13 @@
package org.apache.spark.sql.jdbc
import java.sql.{Date, Timestamp, Types}
-import java.util.{Locale, TimeZone}
+import java.util.Locale
import scala.util.control.NonFatal
import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -80,13 +78,6 @@ private case object OracleDialect extends JdbcDialect {
}
}
- private def supportTimeZoneTypes: Boolean = {
- val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
- // TODO: support timezone types when users are not using the JVM timezone, which
- // is the default value of SESSION_LOCAL_TIMEZONE
- timeZone == TimeZone.getDefault
- }
-
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
@@ -107,11 +98,14 @@ private case object OracleDialect extends JdbcDialect {
case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10))
case _ => None
}
- case TIMESTAMP_TZ if supportTimeZoneTypes =>
- // Value for Timestamp with Time Zone in Oracle
- Some(TimestampType)
- case TIMESTAMP_LTZ =>
- // Value for Timestamp with Local Time Zone in Oracle
+ case TIMESTAMP_TZ | TIMESTAMP_LTZ =>
+ // TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE types can be represented
+ // as standard java.sql.Timestamp type.
+ // The byte representation of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE
+ // types to java.sql.Timestamp is straight forward.
+ // This is because the internal format of TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL
+ // TIME ZONE data types is GMT, and java.sql.Timestamp type objects internally use a
+ // milliseconds time value that is the number of milliseconds since EPOCH.
Some(TimestampType)
case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org