You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/01/23 12:23:53 UTC

[spark] branch master updated: [SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d5bb9  [SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds
46d5bb9 is described below

commit 46d5bb9a0fa4f44ea857e2cb15bf15acd773b839
Author: Maxim Gekk <ma...@databricks.com>
AuthorDate: Wed Jan 23 20:23:17 2019 +0800

    [SPARK-26653][SQL] Use Proleptic Gregorian calendar in parsing JDBC lower/upper bounds
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose using of the `stringToDate` and `stringToTimestamp` methods in parsing JDBC lower/upper bounds of the partition column if it has `DateType` or `TimestampType`. Since those methods have been ported on Proleptic Gregorian calendar by #23512, the PR switches parsing of JDBC bounds of the partition column on the calendar as well.
    
    ## How was this patch tested?
    
    This was tested by `JDBCSuite`.
    
    Closes #23597 from MaxGekk/jdbc-parse-timestamp-bounds.
    
    Lead-authored-by: Maxim Gekk <ma...@databricks.com>
    Co-authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-migration-guide-upgrade.md                |  2 ++
 .../execution/datasources/jdbc/JDBCRelation.scala  | 27 ++++++++++++-----
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 34 +++++++++++++++++++++-
 3 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index d442087..98fc9fa 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -93,6 +93,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, the `weekofyear`, `weekday` and `dayofweek` functions use java.time API for calculation week number of year and day number of week based on Proleptic Gregorian calendar. In Spark version 2.4 and earlier, the hybrid calendar (Julian + Gregorian) is used for the same purpose. Results of the functions returned by Spark 3.0 and previous versions can be different for dates before October 15, 1582 (Gregorian).
 
+  - Since Spark 3.0, the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone.
+
 ## Upgrading From Spark SQL 2.3 to 2.4
 
   - In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 13ed105..c0f78b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Date, Timestamp}
-
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Partition
@@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * Instructions on how to partition the table among workers.
@@ -85,8 +85,8 @@ private[sql] object JDBCRelation extends Logging {
         val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
           schema, partitionColumn.get, resolver, jdbcOptions)
 
-        val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType)
-        val upperBoundValue = toInternalBoundValue(upperBound.get, columnType)
+        val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
+        val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
         JDBCPartitioningInfo(
           column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
       }
@@ -174,10 +174,21 @@ private[sql] object JDBCRelation extends Logging {
     (dialect.quoteIdentifier(column.name), column.dataType)
   }
 
-  private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
-    case _: NumericType => value.toLong
-    case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
-    case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
+  private def toInternalBoundValue(
+      value: String,
+      columnType: DataType,
+      timeZoneId: String): Long = {
+    def parse[T](f: UTF8String => Option[T]): T = {
+      f(UTF8String.fromString(value)).getOrElse {
+        throw new IllegalArgumentException(
+          s"Cannot parse the bound value $value as ${columnType.catalogString}")
+      }
+    }
+    columnType match {
+      case _: NumericType => value.toLong
+      case DateType => parse(stringToDate).toLong
+      case TimestampType => parse(stringToTimestamp(_, getTimeZone(timeZoneId)))
+    }
   }
 
   private def toBoundValueInWhereClause(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 284900b..a4dc537 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -1523,4 +1523,36 @@ class JDBCSuite extends QueryTest
     assert(e.contains("The driver could not open a JDBC connection. " +
       "Check the URL: jdbc:mysql://localhost/db"))
   }
+
+  test("support casting patterns for lower/upper bounds of TimestampType") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        Seq(
+          ("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"),
+          ("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"),
+          ("2019-01-20T00:00:00.123456", "2019-01-20 00:05:00.123456",
+            "2019-01-20T00:10:00.123456"),
+          ("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456")
+        ).foreach { case (lower, middle, upper) =>
+          val df = spark.read.format("jdbc")
+            .option("url", urlWithUserAndPass)
+            .option("dbtable", "TEST.DATETIME")
+            .option("partitionColumn", "t")
+            .option("lowerBound", lower)
+            .option("upperBound", upper)
+            .option("numPartitions", 2)
+            .load()
+
+          df.logicalPlan match {
+            case lr: LogicalRelation if lr.relation.isInstanceOf[JDBCRelation] =>
+              val jdbcRelation = lr.relation.asInstanceOf[JDBCRelation]
+              val whereClauses = jdbcRelation.parts.map(_.asInstanceOf[JDBCPartition].whereClause)
+              assert(whereClauses.toSet === Set(
+                s""""T" < '$middle' or "T" is null""",
+                s""""T" >= '$middle'"""))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org