You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/25 18:35:10 UTC

spark git commit: [SPARK-18413][SQL][FOLLOW-UP] Use `numPartitions` instead of `maxConnections`

Repository: spark
Updated Branches:
  refs/heads/master 445d4d9e1 -> fb07bbe57


[SPARK-18413][SQL][FOLLOW-UP] Use `numPartitions` instead of `maxConnections`

## What changes were proposed in this pull request?

This is a follow-up PR of #15868 to merge `maxConnections` option into `numPartitions` options.

## How was this patch tested?

Pass the existing tests.

Author: Dongjoon Hyun <do...@apache.org>

Closes #15966 from dongjoon-hyun/SPARK-18413-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb07bbe5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb07bbe5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb07bbe5

Branch: refs/heads/master
Commit: fb07bbe575aabe68422fd3a31865101fb7fa1722
Parents: 445d4d9
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Nov 25 10:35:07 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Nov 25 10:35:07 2016 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   | 24 ++++++++++++--------
 .../datasources/jdbc/JDBCOptions.scala          | 24 +++++++++-----------
 .../datasources/jdbc/JdbcRelationProvider.scala |  6 +++--
 .../execution/datasources/jdbc/JdbcUtils.scala  |  6 ++---
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  6 ++---
 5 files changed, 35 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb07bbe5/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 656e7ec..be53a8d 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1061,10 +1061,11 @@ the following case-sensitive options:
   </tr>
 
   <tr>
-    <td><code>partitionColumn, lowerBound, upperBound, numPartitions</code></td>
+    <td><code>partitionColumn, lowerBound, upperBound</code></td>
     <td>
-      These options must all be specified if any of them is specified. They describe how to
-      partition the table when reading in parallel from multiple workers.
+      These options must all be specified if any of them is specified. In addition,
+      <code>numPartitions</code> must be specified. They describe how to partition the table when
+      reading in parallel from multiple workers.
       <code>partitionColumn</code> must be a numeric column from the table in question. Notice
       that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the
       partition stride, not for filtering the rows in table. So all rows in the table will be
@@ -1073,6 +1074,16 @@ the following case-sensitive options:
   </tr>
 
   <tr>
+     <td><code>numPartitions</code></td>
+     <td>
+       The maximum number of partitions that can be used for parallelism in table reading and
+       writing. This also determines the maximum number of concurrent JDBC connections.
+       If the number of partitions to write exceeds this limit, we decrease it to this limit by
+       calling <code>coalesce(numPartitions)</code> before writing.
+     </td>
+  </tr>
+
+  <tr>
     <td><code>fetchsize</code></td>
     <td>
       The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
@@ -1087,13 +1098,6 @@ the following case-sensitive options:
   </tr>
 
   <tr>
-     <td><code>maxConnections</code></td>
-     <td>
-       The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
-     </td>
-  </tr>
-
-  <tr>
      <td><code>isolationLevel</code></td>
      <td>
        The transaction isolation level, which applies to current connection. It can be one of <code>NONE<code>, <code>READ_COMMITTED<code>, <code>READ_UNCOMMITTED<code>, <code>REPEATABLE_READ<code>, or <code>SERIALIZABLE<code>, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of <code>READ_UNCOMMITTED<code>. This option applies only to writing. Please refer the documentation in <code>java.sql.Connection</code>.

http://git-wip-us.apache.org/repos/asf/spark/blob/fb07bbe5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index d416eec..fe2f4c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -74,19 +74,23 @@ class JDBCOptions(
     }
   }
 
+  // the number of partitions
+  val numPartitions = parameters.get(JDBC_NUM_PARTITIONS).map(_.toInt)
+  require(numPartitions.isEmpty || numPartitions.get > 0,
+    s"Invalid value `${numPartitions.get}` for parameter `$JDBC_NUM_PARTITIONS`. " +
+      "The minimum value is 1.")
+
   // ------------------------------------------------------------
   // Optional parameters only for reading
   // ------------------------------------------------------------
   // the column used to partition
-  val partitionColumn = parameters.getOrElse(JDBC_PARTITION_COLUMN, null)
+  val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN)
   // the lower bound of partition column
-  val lowerBound = parameters.getOrElse(JDBC_LOWER_BOUND, null)
+  val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong)
   // the upper bound of the partition column
-  val upperBound = parameters.getOrElse(JDBC_UPPER_BOUND, null)
-  // the number of partitions
-  val numPartitions = parameters.getOrElse(JDBC_NUM_PARTITIONS, null)
-  require(partitionColumn == null ||
-    (lowerBound != null && upperBound != null && numPartitions != null),
+  val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong)
+  require(partitionColumn.isEmpty ||
+    (lowerBound.isDefined && upperBound.isDefined && numPartitions.isDefined),
     s"If '$JDBC_PARTITION_COLUMN' is specified then '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND'," +
       s" and '$JDBC_NUM_PARTITIONS' are required.")
   val fetchSize = {
@@ -122,11 +126,6 @@ class JDBCOptions(
       case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
       case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
     }
-  // the maximum number of connections
-  val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
-  require(maxConnections.isEmpty || maxConnections.get > 0,
-    s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
-      "The minimum value is 1.")
 }
 
 object JDBCOptions {
@@ -149,5 +148,4 @@ object JDBCOptions {
   val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
   val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
-  val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb07bbe5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index 4420b3b..74f397c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -35,11 +35,13 @@ class JdbcRelationProvider extends CreatableRelationProvider
     val upperBound = jdbcOptions.upperBound
     val numPartitions = jdbcOptions.numPartitions
 
-    val partitionInfo = if (partitionColumn == null) {
+    val partitionInfo = if (partitionColumn.isEmpty) {
+      assert(lowerBound.isEmpty && upperBound.isEmpty)
       null
     } else {
+      assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty)
       JDBCPartitioningInfo(
-        partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt)
+        partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get)
     }
     val parts = JDBCRelation.columnPartition(partitionInfo)
     JDBCRelation(parts, jdbcOptions)(sqlContext.sparkSession)

http://git-wip-us.apache.org/repos/asf/spark/blob/fb07bbe5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index cdc3c99..c2a1ad8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -667,10 +667,10 @@ object JdbcUtils extends Logging {
     val getConnection: () => Connection = createConnectionFactory(options)
     val batchSize = options.batchSize
     val isolationLevel = options.isolationLevel
-    val maxConnections = options.maxConnections
+    val numPartitions = options.numPartitions
     val repartitionedDF =
-      if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
-        df.coalesce(maxConnections.get)
+      if (numPartitions.isDefined && numPartitions.get < df.rdd.getNumPartitions) {
+        df.coalesce(numPartitions.get)
       } else {
         df
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb07bbe5/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 5795b4d..c834419 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -313,15 +313,15 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
       .save()
   }
 
-  test("SPARK-18413: Add `maxConnections` JDBCOption") {
+  test("SPARK-18413: Use `numPartitions` JDBCOption") {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
     val e = intercept[IllegalArgumentException] {
       df.write.format("jdbc")
         .option("dbtable", "TEST.SAVETEST")
         .option("url", url1)
-        .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
+        .option(s"${JDBCOptions.JDBC_NUM_PARTITIONS}", "0")
         .save()
     }.getMessage
-    assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
+    assert(e.contains("Invalid value `0` for parameter `numPartitions`. The minimum value is 1"))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org