You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/30 14:42:05 UTC

spark git commit: [SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column

Repository: spark
Updated Branches:
  refs/heads/master b90bfe3c4 -> 47d84e4d0


[SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column

## What changes were proposed in this pull request?
This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type;
```
val jdbcTable = spark.read
 .option("partitionColumn", "text")
 .option("lowerBound", "aaa")
 .option("upperBound", "zzz")
 .option("numPartitions", 2)
 .jdbc("jdbc:postgresql:postgres", "t", options)

// with this pr
org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.;
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)

// without this pr
java.lang.NumberFormatException: For input string: "aaa"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
```

Closes #19999

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #21834 from maropu/SPARK-22814.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47d84e4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47d84e4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47d84e4d

Branch: refs/heads/master
Commit: 47d84e4d0e56e14f9402770dceaf0b4302c00e98
Parents: b90bfe3
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Jul 30 07:42:00 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Jul 30 07:42:00 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   4 +-
 .../spark/sql/jdbc/OracleIntegrationSuite.scala |  86 +++++++++++++--
 .../spark/sql/catalyst/util/DateTimeUtils.scala |  10 +-
 .../datasources/PartitioningUtils.scala         |   2 +-
 .../datasources/jdbc/JDBCOptions.scala          |   4 +-
 .../datasources/jdbc/JDBCRelation.scala         | 107 +++++++++++++++----
 .../datasources/jdbc/JdbcRelationProvider.scala |  21 +---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  77 ++++++++++++-
 8 files changed, 258 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 4b013c6..cff521c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1345,8 +1345,8 @@ the following case-insensitive options:
       These options must all be specified if any of them is specified. In addition,
       <code>numPartitions</code> must be specified. They describe how to partition the table when
       reading in parallel from multiple workers.
-      <code>partitionColumn</code> must be a numeric column from the table in question. Notice
-      that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
+      <code>partitionColumn</code> must be a numeric, date, or timestamp column from the table in question.
+      Notice that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
       partition stride, not for filtering the rows in table. So all rows in the table will be
       partitioned and returned. This option applies only to reading.
     </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 8512496..09a2cd8 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql.jdbc
 
+import java.math.BigDecimal
 import java.sql.{Connection, Date, Timestamp}
 import java.util.{Properties, TimeZone}
-import java.math.BigDecimal
 
-import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -86,7 +88,8 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     conn.prepareStatement(
       "CREATE TABLE tableWithCustomSchema (id NUMBER, n1 NUMBER(1), n2 NUMBER(1))").executeUpdate()
     conn.prepareStatement(
-      "INSERT INTO tableWithCustomSchema values(12312321321321312312312312123, 1, 0)").executeUpdate()
+      "INSERT INTO tableWithCustomSchema values(12312321321321312312312312123, 1, 0)")
+      .executeUpdate()
     conn.commit()
 
     sql(
@@ -108,15 +111,36 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
       """.stripMargin.replaceAll("\n", " "))
 
 
-    conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))").executeUpdate()
+    conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))")
+      .executeUpdate()
     conn.prepareStatement(
       "INSERT INTO numerics VALUES (4, 1.23, 9999999999)").executeUpdate()
     conn.commit()
 
-    conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)").executeUpdate()
+    conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)")
+      .executeUpdate()
     conn.commit()
-  }
 
+    conn.prepareStatement("CREATE TABLE datetimePartitionTest (id NUMBER(10), d DATE, t TIMESTAMP)")
+      .executeUpdate()
+    conn.prepareStatement(
+      """INSERT INTO datetimePartitionTest VALUES
+        |(1, {d '2018-07-06'}, {ts '2018-07-06 05:50:00'})
+      """.stripMargin.replaceAll("\n", " ")).executeUpdate()
+    conn.prepareStatement(
+      """INSERT INTO datetimePartitionTest VALUES
+        |(2, {d '2018-07-06'}, {ts '2018-07-06 08:10:08'})
+      """.stripMargin.replaceAll("\n", " ")).executeUpdate()
+    conn.prepareStatement(
+      """INSERT INTO datetimePartitionTest VALUES
+        |(3, {d '2018-07-08'}, {ts '2018-07-08 13:32:01'})
+      """.stripMargin.replaceAll("\n", " ")).executeUpdate()
+    conn.prepareStatement(
+      """INSERT INTO datetimePartitionTest VALUES
+        |(4, {d '2018-07-12'}, {ts '2018-07-12 09:51:15'})
+      """.stripMargin.replaceAll("\n", " ")).executeUpdate()
+    conn.commit()
+  }
 
   test("SPARK-16625 : Importing Oracle numeric types") {
     val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties)
@@ -399,4 +423,54 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
     assert(values.getDouble(0) === 1.1)
     assert(values.getFloat(1) === 2.2f)
   }
+
+  test("SPARK-22814 support date/timestamp types in partitionColumn") {
+    val expectedResult = Set(
+      (1, "2018-07-06", "2018-07-06 05:50:00"),
+      (2, "2018-07-06", "2018-07-06 08:10:08"),
+      (3, "2018-07-08", "2018-07-08 13:32:01"),
+      (4, "2018-07-12", "2018-07-12 09:51:15")
+    ).map { case (id, date, timestamp) =>
+      Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp))
+    }
+
+    // DateType partition column
+    val df1 = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("dbtable", "datetimePartitionTest")
+      .option("partitionColumn", "d")
+      .option("lowerBound", "2018-07-06")
+      .option("upperBound", "2018-07-20")
+      .option("numPartitions", 3)
+      .load()
+
+    df1.logicalPlan match {
+      case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
+        val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
+        assert(whereClauses === Set(
+          """"D" < '2018-07-10' or "D" is null""",
+          """"D" >= '2018-07-10' AND "D" < '2018-07-14'""",
+          """"D" >= '2018-07-14'"""))
+    }
+    assert(df1.collect.toSet === expectedResult)
+
+    // TimestampType partition column
+    val df2 = spark.read.format("jdbc")
+      .option("url", jdbcUrl)
+      .option("dbtable", "datetimePartitionTest")
+      .option("partitionColumn", "t")
+      .option("lowerBound", "2018-07-04 03:30:00.0")
+      .option("upperBound", "2018-07-27 14:11:05.0")
+      .option("numPartitions", 2)
+      .load()
+
+    df2.logicalPlan match {
+      case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
+        val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
+        assert(whereClauses === Set(
+          """"T" < '2018-07-15 20:50:32.5' or "T" is null""",
+          """"T" >= '2018-07-15 20:50:32.5'"""))
+    }
+    assert(df2.collect.toSet === expectedResult)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 80f1505..02813d3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -96,9 +96,9 @@ object DateTimeUtils {
     }
   }
 
-  def getThreadLocalDateFormat(): DateFormat = {
+  def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = {
     val sdf = threadLocalDateFormat.get()
-    sdf.setTimeZone(defaultTimeZone())
+    sdf.setTimeZone(timeZone)
     sdf
   }
 
@@ -144,7 +144,11 @@ object DateTimeUtils {
   }
 
   def dateToString(days: SQLDate): String =
-    getThreadLocalDateFormat.format(toJavaDate(days))
+    getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days))
+
+  def dateToString(days: SQLDate, timeZone: TimeZone): String = {
+    getThreadLocalDateFormat(timeZone).format(toJavaDate(days))
+  }
 
   // Converts Timestamp to string according to Hive TimestampWritable convention.
   def timestampToString(us: SQLTimestamp): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index f9a2480..c8a5f98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -410,7 +410,7 @@ object PartitioningUtils {
     val dateTry = Try {
       // try and parse the date, if no exception occurs this is a candidate to be resolved as
       // DateType
-      DateTimeUtils.getThreadLocalDateFormat.parse(raw)
+      DateTimeUtils.getThreadLocalDateFormat(DateTimeUtils.defaultTimeZone()).parse(raw)
       // SPARK-23436: Casting the string to date may still return null if a bad Date is provided.
       // This can happen since DateFormat.parse  may not use the entire text of the given string:
       // so if there are extra-characters after the date, it returns correctly.

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index d80efce..7dfbb9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -119,9 +119,9 @@ class JDBCOptions(
   // the column used to partition
   val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN)
   // the lower bound of partition column
-  val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
+  val lowerBound = parameters.get(JDBC_LOWER_BOUND)
   // the upper bound of the partition column
-  val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
+  val upperBound = parameters.get(JDBC_UPPER_BOUND)
   // numPartitions is also used for data source writing
   require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) ||
     (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined &&

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 4f78f59..f150144 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
+import java.sql.{Date, Timestamp}
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Partition
@@ -24,9 +26,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType}
 import org.apache.spark.util.Utils
 
 /**
@@ -34,6 +37,7 @@ import org.apache.spark.util.Utils
  */
 private[sql] case class JDBCPartitioningInfo(
     column: String,
+    columnType: DataType,
     lowerBound: Long,
     upperBound: Long,
     numPartitions: Int)
@@ -51,16 +55,43 @@ private[sql] object JDBCRelation extends Logging {
    * the rows with null value for the partitions column.
    *
    * @param schema resolved schema of a JDBC table
-   * @param partitioning partition information to generate the where clause for each partition
    * @param resolver function used to determine if two identifiers are equal
+   * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp
    * @param jdbcOptions JDBC options that contains url
    * @return an array of partitions with where clause for each partition
    */
   def columnPartition(
       schema: StructType,
-      partitioning: JDBCPartitioningInfo,
       resolver: Resolver,
+      timeZoneId: String,
       jdbcOptions: JDBCOptions): Array[Partition] = {
+    val partitioning = {
+      import JDBCOptions._
+
+      val partitionColumn = jdbcOptions.partitionColumn
+      val lowerBound = jdbcOptions.lowerBound
+      val upperBound = jdbcOptions.upperBound
+      val numPartitions = jdbcOptions.numPartitions
+
+      if (partitionColumn.isEmpty) {
+        assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not " +
+          s"specified, '$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
+        null
+      } else {
+        assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
+          s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
+            s"'$JDBC_NUM_PARTITIONS' are also required")
+
+        val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
+          schema, partitionColumn.get, resolver, jdbcOptions)
+
+        val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType)
+        val upperBoundValue = toInternalBoundValue(upperBound.get, columnType)
+        JDBCPartitioningInfo(
+          column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
+      }
+    }
+
     if (partitioning == null || partitioning.numPartitions <= 1 ||
       partitioning.lowerBound == partitioning.upperBound) {
       return Array[Partition](JDBCPartition(null, 0))
@@ -72,6 +103,8 @@ private[sql] object JDBCRelation extends Logging {
       "Operation not allowed: the lower bound of partitioning column is larger than the upper " +
       s"bound. Lower bound: $lowerBound; Upper bound: $upperBound")
 
+    val boundValueToString: Long => String =
+      toBoundValueInWhereClause(_, partitioning.columnType, timeZoneId)
     val numPartitions =
       if ((upperBound - lowerBound) >= partitioning.numPartitions || /* check for overflow */
           (upperBound - lowerBound) < 0) {
@@ -80,24 +113,25 @@ private[sql] object JDBCRelation extends Logging {
         logWarning("The number of partitions is reduced because the specified number of " +
           "partitions is less than the difference between upper bound and lower bound. " +
           s"Updated number of partitions: ${upperBound - lowerBound}; Input number of " +
-          s"partitions: ${partitioning.numPartitions}; Lower bound: $lowerBound; " +
-          s"Upper bound: $upperBound.")
+          s"partitions: ${partitioning.numPartitions}; " +
+          s"Lower bound: ${boundValueToString(lowerBound)}; " +
+          s"Upper bound: ${boundValueToString(upperBound)}.")
         upperBound - lowerBound
       }
     // Overflow and silliness can happen if you subtract then divide.
     // Here we get a little roundoff, but that's (hopefully) OK.
     val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
 
-    val column = verifyAndGetNormalizedColumnName(
-      schema, partitioning.column, resolver, jdbcOptions)
-
     var i: Int = 0
-    var currentValue: Long = lowerBound
+    val column = partitioning.column
+    var currentValue = lowerBound
     val ans = new ArrayBuffer[Partition]()
     while (i < numPartitions) {
-      val lBound = if (i != 0) s"$column >= $currentValue" else null
+      val lBoundValue = boundValueToString(currentValue)
+      val lBound = if (i != 0) s"$column >= $lBoundValue" else null
       currentValue += stride
-      val uBound = if (i != numPartitions - 1) s"$column < $currentValue" else null
+      val uBoundValue = boundValueToString(currentValue)
+      val uBound = if (i != numPartitions - 1) s"$column < $uBoundValue" else null
       val whereClause =
         if (uBound == null) {
           lBound
@@ -109,23 +143,58 @@ private[sql] object JDBCRelation extends Logging {
       ans += JDBCPartition(whereClause, i)
       i = i + 1
     }
-    ans.toArray
+    val partitions = ans.toArray
+    logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " +
+      partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", "))
+    partitions
   }
 
-  // Verify column name based on the JDBC resolved schema
-  private def verifyAndGetNormalizedColumnName(
+  // Verify column name and type based on the JDBC resolved schema
+  private def verifyAndGetNormalizedPartitionColumn(
       schema: StructType,
       columnName: String,
       resolver: Resolver,
-      jdbcOptions: JDBCOptions): String = {
+      jdbcOptions: JDBCOptions): (String, DataType) = {
     val dialect = JdbcDialects.get(jdbcOptions.url)
-    schema.map(_.name).find { fieldName =>
-      resolver(fieldName, columnName) ||
-        resolver(dialect.quoteIdentifier(fieldName), columnName)
-    }.map(dialect.quoteIdentifier).getOrElse {
+    val column = schema.find { f =>
+      resolver(f.name, columnName) || resolver(dialect.quoteIdentifier(f.name), columnName)
+    }.getOrElse {
       throw new AnalysisException(s"User-defined partition column $columnName not " +
         s"found in the JDBC relation: ${schema.simpleString(Utils.maxNumToStringFields)}")
     }
+    column.dataType match {
+      case _: NumericType | DateType | TimestampType =>
+      case _ =>
+        throw new AnalysisException(
+          s"Partition column type should be ${NumericType.simpleString}, " +
+            s"${DateType.catalogString}, or ${TimestampType.catalogString}, but " +
+            s"${column.dataType.catalogString} found.")
+    }
+    (dialect.quoteIdentifier(column.name), column.dataType)
+  }
+
+  private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
+    case _: NumericType => value.toLong
+    case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
+    case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
+  }
+
+  private def toBoundValueInWhereClause(
+      value: Long,
+      columnType: DataType,
+      timeZoneId: String): String = {
+    def dateTimeToString(): String = {
+      val timeZone = DateTimeUtils.getTimeZone(timeZoneId)
+      val dateTimeStr = columnType match {
+        case DateType => DateTimeUtils.dateToString(value.toInt, timeZone)
+        case TimestampType => DateTimeUtils.timestampToString(value, timeZone)
+      }
+      s"'$dateTimeStr'"
+    }
+    columnType match {
+      case _: NumericType => value.toString
+      case DateType | TimestampType => dateTimeToString()
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 782d626..e7456f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -29,28 +29,11 @@ class JdbcRelationProvider extends CreatableRelationProvider
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
-    import JDBCOptions._
-
     val jdbcOptions = new JDBCOptions(parameters)
-    val partitionColumn = jdbcOptions.partitionColumn
-    val lowerBound = jdbcOptions.lowerBound
-    val upperBound = jdbcOptions.upperBound
-    val numPartitions = jdbcOptions.numPartitions
-
-    val partitionInfo = if (partitionColumn.isEmpty) {
-      assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not specified, " +
-        s"'$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
-      null
-    } else {
-      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
-        s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
-          s"'$JDBC_NUM_PARTITIONS' are also required")
-      JDBCPartitioningInfo(
-        partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
-    }
     val resolver = sqlContext.conf.resolver
+    val timeZoneId = sqlContext.conf.sessionLocalTimeZone
     val schema = JDBCRelation.getSchema(resolver, jdbcOptions)
-    val parts = JDBCRelation.columnPartition(schema, partitionInfo, resolver, jdbcOptions)
+    val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions)
     JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 0edbd3a..7fa0e7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -24,7 +24,7 @@ import java.util.{Calendar, GregorianCalendar, Properties}
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -244,6 +244,17 @@ class JDBCSuite extends QueryTest
       .executeUpdate()
     conn.commit()
 
+    conn.prepareStatement("CREATE TABLE test.datetime (d DATE, t TIMESTAMP)").executeUpdate()
+    conn.prepareStatement(
+      "INSERT INTO test.datetime VALUES ('2018-07-06', '2018-07-06 05:50:00.0')").executeUpdate()
+    conn.prepareStatement(
+      "INSERT INTO test.datetime VALUES ('2018-07-06', '2018-07-06 08:10:08.0')").executeUpdate()
+    conn.prepareStatement(
+      "INSERT INTO test.datetime VALUES ('2018-07-08', '2018-07-08 13:32:01.0')").executeUpdate()
+    conn.prepareStatement(
+      "INSERT INTO test.datetime VALUES ('2018-07-12', '2018-07-12 09:51:15.0')").executeUpdate()
+    conn.commit()
+
     // Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
   }
 
@@ -1375,7 +1386,71 @@ class JDBCSuite extends QueryTest
     checkAnswer(
       sql("select name, theid from queryOption"),
       Row("fred", 1) :: Nil)
+  }
+
+  test("SPARK-22814 support date/timestamp types in partitionColumn") {
+    val expectedResult = Seq(
+      ("2018-07-06", "2018-07-06 05:50:00.0"),
+      ("2018-07-06", "2018-07-06 08:10:08.0"),
+      ("2018-07-08", "2018-07-08 13:32:01.0"),
+      ("2018-07-12", "2018-07-12 09:51:15.0")
+    ).map { case (date, timestamp) =>
+      Row(Date.valueOf(date), Timestamp.valueOf(timestamp))
+    }
+
+    // DateType partition column
+    val df1 = spark.read.format("jdbc")
+      .option("url", urlWithUserAndPass)
+      .option("dbtable", "TEST.DATETIME")
+      .option("partitionColumn", "d")
+      .option("lowerBound", "2018-07-06")
+      .option("upperBound", "2018-07-20")
+      .option("numPartitions", 3)
+      .load()
 
+    df1.logicalPlan match {
+      case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
+        val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
+        assert(whereClauses === Set(
+          """"D" < '2018-07-10' or "D" is null""",
+          """"D" >= '2018-07-10' AND "D" < '2018-07-14'""",
+          """"D" >= '2018-07-14'"""))
+    }
+    checkAnswer(df1, expectedResult)
+
+    // TimestampType partition column
+    val df2 = spark.read.format("jdbc")
+      .option("url", urlWithUserAndPass)
+      .option("dbtable", "TEST.DATETIME")
+      .option("partitionColumn", "t")
+      .option("lowerBound", "2018-07-04 03:30:00.0")
+      .option("upperBound", "2018-07-27 14:11:05.0")
+      .option("numPartitions", 2)
+      .load()
+
+    df2.logicalPlan match {
+      case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
+        val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
+        assert(whereClauses === Set(
+          """"T" < '2018-07-15 20:50:32.5' or "T" is null""",
+          """"T" >= '2018-07-15 20:50:32.5'"""))
+    }
+    checkAnswer(df2, expectedResult)
+  }
+
+  test("throws an exception for unsupported partition column types") {
+    val errMsg = intercept[AnalysisException] {
+      spark.read.format("jdbc")
+        .option("url", urlWithUserAndPass)
+        .option("dbtable", "TEST.PEOPLE")
+        .option("partitionColumn", "name")
+        .option("lowerBound", "aaa")
+        .option("upperBound", "zzz")
+        .option("numPartitions", 2)
+        .load()
+    }.getMessage
+    assert(errMsg.contains(
+      "Partition column type should be numeric, date, or timestamp, but string found."))
   }
 
   test("SPARK-24288: Enable preventing predicate pushdown") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org