You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/01/23 12:23:53 UTC
[spark] branch master updated: [SPARK-26653][SQL] Use Proleptic
Gregorian calendar in parsing JDBC lower/upper bounds
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 46d5bb9 [SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds
46d5bb9 is described below
commit 46d5bb9a0fa4f44ea857e2cb15bf15acd773b839
Author: Maxim Gekk <ma...@databricks.com>
AuthorDate: Wed Jan 23 20:23:17 2019 +0800
[SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds
## What changes were proposed in this pull request?
In the PR, I propose using of the `stringToDate` and `stringToTimestamp` methods in parsing JDBC lower/upper bounds of the partition column if it has `DateType` or `TimestampType`. Since those methods have been ported on Proleptic Gregorian calendar by #23512, the PR switches parsing of JDBC bounds of the partition column on the calendar as well.
## How was this patch tested?
This was tested by `JDBCSuite`.
Closes #23597 from MaxGekk/jdbc-parse-timestamp-bounds.
Lead-authored-by: Maxim Gekk <ma...@databricks.com>
Co-authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
docs/sql-migration-guide-upgrade.md | 2 ++
.../execution/datasources/jdbc/JDBCRelation.scala | 27 ++++++++++++-----
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 34 +++++++++++++++++++++-
3 files changed, 54 insertions(+), 9 deletions(-)
diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index d442087..98fc9fa 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -93,6 +93,8 @@ displayTitle: Spark SQL Upgrading Guide
- Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use java.time API for calculation week number of year and day number of week based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid calendar (Julian + Gregorian) is used for the same purpose. Results of the functions returned by Spark 3.0 and previous versions can be different for dates before October 15, 1582 (Gregorian).
+ - Since Spark 3.0, the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone.
+
## Upgrading From Spark SQL 2.3 to 2.4
- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 13ed105..c0f78b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources.jdbc
-import java.sql.{Date, Timestamp}
-
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Partition
@@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
/**
* Instructions on how to partition the table among workers.
@@ -85,8 +85,8 @@ private[sql] object JDBCRelation extends Logging {
val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
schema, partitionColumn.get, resolver, jdbcOptions)
- val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType)
- val upperBoundValue = toInternalBoundValue(upperBound.get, columnType)
+ val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
+ val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
JDBCPartitioningInfo(
column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
}
@@ -174,10 +174,21 @@ private[sql] object JDBCRelation extends Logging {
(dialect.quoteIdentifier(column.name), column.dataType)
}
- private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
- case _: NumericType => value.toLong
- case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
- case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
+ private def toInternalBoundValue(
+ value: String,
+ columnType: DataType,
+ timeZoneId: String): Long = {
+ def parse[T](f: UTF8String => Option[T]): T = {
+ f(UTF8String.fromString(value)).getOrElse {
+ throw new IllegalArgumentException(
+ s"Cannot parse the bound value $value as ${columnType.catalogString}")
+ }
+ }
+ columnType match {
+ case _: NumericType => value.toLong
+ case DateType => parse(stringToDate).toLong
+ case TimestampType => parse(stringToTimestamp(_, getTimeZone(timeZoneId)))
+ }
}
private def toBoundValueInWhereClause(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 284900b..a4dc537 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -1523,4 +1523,36 @@ class JDBCSuite extends QueryTest
assert(e.contains("The driver could not open a JDBC connection. " +
"Check the URL: jdbc:mysql://localhost/db"))
}
+
+ test("support casting patterns for lower/upper bounds of TimestampType") {
+ DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+ withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+ Seq(
+ ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"),
+ ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"),
+ ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456",
+ "2019-01-20T00:10:00.123456"),
+ ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456")
+ ).foreach { case (lower, middle, upper) =>
+ val df = spark.read.format("jdbc")
+ .option("url", urlWithUserAndPass)
+ .option("dbtable", "TEST.DATETIME")
+ .option("partitionColumn", "t")
+ .option("lowerBound", lower)
+ .option("upperBound", upper)
+ .option("numPartitions", 2)
+ .load()
+
+ df.logicalPlan match {
+ case lr: LogicalRelation if lr.relation.isInstanceOf[JDBCRelation] =>
+ val jdbcRelation = lr.relation.asInstanceOf[JDBCRelation]
+ val whereClauses = jdbcRelation.parts.map(_.asInstanceOf[JDBCPartition].whereClause)
+ assert(whereClauses.toSet === Set(
+ s""""T" < '$middle' or "T" is null""",
+ s""""T" >= '$middle'"""))
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org