You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/18 20:38:39 UTC

spark git commit: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCOptions

Repository: spark
Updated Branches:
  refs/heads/master 3159ee085 -> a53ea70c1


[SPARK-23856][SQL] Add an option `queryTimeout` in JDBCOptions

## What changes were proposed in this pull request?
This pr added an option `queryTimeout` for the number of seconds the  the driver will wait for a Statement object to execute.

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #21173 from maropu/SPARK-23856.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a53ea70c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a53ea70c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a53ea70c

Branch: refs/heads/master
Commit: a53ea70c1d8903cdff051edf667b0127c8131a09
Parents: 3159ee0
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Fri May 18 13:38:36 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri May 18 13:38:36 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                     | 11 +++++++++++
 .../org/apache/spark/sql/DataFrameReader.scala    |  3 ++-
 .../execution/datasources/jdbc/JDBCOptions.scala  |  5 +++++
 .../sql/execution/datasources/jdbc/JDBCRDD.scala  |  3 +++
 .../datasources/jdbc/JdbcRelationProvider.scala   |  2 +-
 .../execution/datasources/jdbc/JdbcUtils.scala    | 16 +++++++++++++---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala     | 16 ++++++++++++++++
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala    | 18 ++++++++++++++++++
 8 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index b93d853..f1ed316 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1339,6 +1339,17 @@ the following case-insensitive options:
   </tr>
 
   <tr>
+    <td><code>queryTimeout</code></td>
+    <td>
+      The number of seconds the driver will wait for a Statement object to execute to the given
+      number of seconds. Zero means there is no limit. In the write path, this option depends on
+      how JDBC drivers implement the API <code>setQueryTimeout</code>, e.g., the h2 JDBC driver
+      checks the timeout of each query instead of an entire JDBC batch.
+      It defaults to <code>0</code>.
+    </td>
+  </tr>
+
+  <tr>
     <td><code>fetchsize</code></td>
     <td>
       The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 53f4488..917f0cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -257,7 +257,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
    *                             tag/value. Normally at least a "user" and "password" property
    *                             should be included. "fetchsize" can be used to control the
-   *                             number of rows per fetch.
+   *                             number of rows per fetch and "queryTimeout" can be used to wait
+   *                             for a Statement object to execute to the given number of seconds.
    * @since 1.4.0
    */
   def jdbc(

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index b4e5d16..a73a97c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -89,6 +89,10 @@ class JDBCOptions(
   // the number of partitions
   val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
 
+  // the number of seconds the driver will wait for a Statement object to execute to the given
+  // number of seconds. Zero means there is no limit.
+  val queryTimeout = parameters.getOrElse(JDBC_QUERY_TIMEOUT, "0").toInt
+
   // ------------------------------------------------------------
   // Optional parameters only for reading
   // ------------------------------------------------------------
@@ -160,6 +164,7 @@ object JDBCOptions {
   val JDBC_LOWER_BOUND = newOption("lowerBound")
   val JDBC_UPPER_BOUND = newOption("upperBound")
   val JDBC_NUM_PARTITIONS = newOption("numPartitions")
+  val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
   val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
   val JDBC_TRUNCATE = newOption("truncate")
   val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 0532621..0bab368 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -57,6 +57,7 @@ object JDBCRDD extends Logging {
     try {
       val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
       try {
+        statement.setQueryTimeout(options.queryTimeout)
         val rs = statement.executeQuery()
         try {
           JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
@@ -281,6 +282,7 @@ private[jdbc] class JDBCRDD(
         val statement = conn.prepareStatement(sql)
         logInfo(s"Executing sessionInitStatement: $sql")
         try {
+          statement.setQueryTimeout(options.queryTimeout)
           statement.execute()
         } finally {
           statement.close()
@@ -298,6 +300,7 @@ private[jdbc] class JDBCRDD(
     stmt = conn.prepareStatement(sqlText,
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
+    stmt.setQueryTimeout(options.queryTimeout)
     rs = stmt.executeQuery()
     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index cc506e5..f8c5677 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -73,7 +73,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
               saveTable(df, tableSchema, isCaseSensitive, options)
             } else {
               // Otherwise, do not truncate the table, instead drop and recreate it
-              dropTable(conn, options.table)
+              dropTable(conn, options.table, options)
               createTable(conn, df, options)
               saveTable(df, Some(df.schema), isCaseSensitive, options)
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index e6dc2fd..4334430 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -76,6 +76,7 @@ object JdbcUtils extends Logging {
     Try {
       val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table))
       try {
+        statement.setQueryTimeout(options.queryTimeout)
         statement.executeQuery()
       } finally {
         statement.close()
@@ -86,9 +87,10 @@ object JdbcUtils extends Logging {
   /**
    * Drops a table from the JDBC database.
    */
-  def dropTable(conn: Connection, table: String): Unit = {
+  def dropTable(conn: Connection, table: String, options: JDBCOptions): Unit = {
     val statement = conn.createStatement
     try {
+      statement.setQueryTimeout(options.queryTimeout)
       statement.executeUpdate(s"DROP TABLE $table")
     } finally {
       statement.close()
@@ -102,6 +104,7 @@ object JdbcUtils extends Logging {
     val dialect = JdbcDialects.get(options.url)
     val statement = conn.createStatement
     try {
+      statement.setQueryTimeout(options.queryTimeout)
       statement.executeUpdate(dialect.getTruncateQuery(options.table))
     } finally {
       statement.close()
@@ -254,6 +257,7 @@ object JdbcUtils extends Logging {
     try {
       val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table))
       try {
+        statement.setQueryTimeout(options.queryTimeout)
         Some(getSchema(statement.executeQuery(), dialect))
       } catch {
         case _: SQLException => None
@@ -596,7 +600,8 @@ object JdbcUtils extends Logging {
       insertStmt: String,
       batchSize: Int,
       dialect: JdbcDialect,
-      isolationLevel: Int): Iterator[Byte] = {
+      isolationLevel: Int,
+      options: JDBCOptions): Iterator[Byte] = {
     val conn = getConnection()
     var committed = false
 
@@ -637,6 +642,9 @@ object JdbcUtils extends Logging {
 
       try {
         var rowCount = 0
+
+        stmt.setQueryTimeout(options.queryTimeout)
+
         while (iterator.hasNext) {
           val row = iterator.next()
           var i = 0
@@ -819,7 +827,8 @@ object JdbcUtils extends Logging {
       case _ => df
     }
     repartitionedDF.rdd.foreachPartition(iterator => savePartition(
-      getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel)
+      getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel,
+      options)
     )
   }
 
@@ -841,6 +850,7 @@ object JdbcUtils extends Logging {
     val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
     val statement = conn.createStatement
     try {
+      statement.setQueryTimeout(options.queryTimeout)
       statement.executeUpdate(sql)
     } finally {
       statement.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5238adc..bc2aca6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1190,4 +1190,20 @@ class JDBCSuite extends SparkFunSuite
       assert(sql("select * from people_view").schema === schema)
     }
   }
+
+  test("SPARK-23856 Spark jdbc setQueryTimeout option") {
+    val numJoins = 100
+    val longRunningQuery =
+      s"SELECT t0.NAME AS c0, ${(1 to numJoins).map(i => s"t$i.NAME AS c$i").mkString(", ")} " +
+        s"FROM test.people t0 ${(1 to numJoins).map(i => s"join test.people t$i").mkString(" ")}"
+    val df = spark.read.format("jdbc")
+      .option("Url", urlWithUserAndPass)
+      .option("dbtable", s"($longRunningQuery)")
+      .option("queryTimeout", 1)
+      .load()
+    val errMsg = intercept[SparkException] {
+      df.collect()
+    }.getMessage
+    assert(errMsg.contains("Statement was canceled or the session timed out"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a53ea70c/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 1985b1d..1c2c92d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -515,4 +515,22 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     }.getMessage
     assert(e.contains("NULL not allowed for column \"NAME\""))
   }
+
+  ignore("SPARK-23856 Spark jdbc setQueryTimeout option") {
+    // The behaviour of the option `queryTimeout` depends on how JDBC drivers implement the API
+    // `setQueryTimeout`. For example, in the h2 JDBC driver, `executeBatch` invokes multiple
+    // INSERT queries in a batch and `setQueryTimeout` means that the driver checks the timeout
+    // of each query. In the PostgreSQL JDBC driver, `setQueryTimeout` means that the driver
+    // checks the timeout of an entire batch in a driver side. So, the test below fails because
+    // this test suite depends on the h2 JDBC driver and the JDBC write path internally
+    // uses `executeBatch`.
+    val errMsg = intercept[SparkException] {
+      spark.range(10000000L).selectExpr("id AS k", "id AS v").coalesce(1).write
+        .mode(SaveMode.Overwrite)
+        .option("queryTimeout", 1)
+        .option("batchsize", Int.MaxValue)
+        .jdbc(url1, "TEST.TIMEOUTTEST", properties)
+    }.getMessage
+    assert(errMsg.contains("Statement was canceled or the session timed out"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org