You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/11/21 13:57:43 UTC

spark git commit: [SPARK-18413][SQL] Add `maxConnections` JDBCOption

Repository: spark
Updated Branches:
  refs/heads/master 9f262ae16 -> 07beb5d21


[SPARK-18413][SQL] Add `maxConnections` JDBCOption

## What changes were proposed in this pull request?

This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API.

**Reported Scenario**

For the following cases, the number of connections becomes 200 and database cannot handle all of them.

```sql
CREATE OR REPLACE TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
-- set spark.sql.shuffle.partitions=200
INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g
```

## How was this patch tested?

Manual. Do the followings and see Spark UI.

**Step 1 (MySQL)**
```
CREATE TABLE t1 (a INT);
CREATE TABLE data (a INT);
INSERT INTO data VALUES (1);
INSERT INTO data VALUES (2);
INSERT INTO data VALUES (3);
```

**Step 2 (Spark)**
```scala
SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
scala> sql("SET spark.sql.shuffle.partitions=3")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
```

![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png)

Author: Dongjoon Hyun <do...@apache.org>

Closes #15868 from dongjoon-hyun/SPARK-18413.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07beb5d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07beb5d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07beb5d2

Branch: refs/heads/master
Commit: 07beb5d21c6803e80733149f1560c71cd3cacc86
Parents: 9f262ae
Author: Dongjoon Hyun <do...@apache.org>
Authored: Mon Nov 21 13:57:36 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Nov 21 13:57:36 2016 +0000

----------------------------------------------------------------------
 docs/sql-programming-guide.md                           |  7 +++++++
 .../sql/execution/datasources/jdbc/JDBCOptions.scala    |  6 ++++++
 .../sql/execution/datasources/jdbc/JdbcUtils.scala      |  9 ++++++++-
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala      | 12 ++++++++++++
 4 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07beb5d2/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index ba3e55f..656e7ec 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1087,6 +1087,13 @@ the following case-sensitive options:
   </tr>
 
   <tr>
+     <td><code>maxConnections</code></td>
+     <td>
+       The maximum number of concurrent JDBC connections that can be used, if set. Only applies when writing. It works by limiting the operation's parallelism, which depends on the input's partition count. If its partition count exceeds this limit, the operation will coalesce the input to fewer partitions before writing.
+     </td>
+  </tr>
+
+  <tr>
      <td><code>isolationLevel</code></td>
      <td>
        The transaction isolation level, which applies to current connection. It can be one of <code>NONE<code>, <code>READ_COMMITTED<code>, <code>READ_UNCOMMITTED<code>, <code>REPEATABLE_READ<code>, or <code>SERIALIZABLE<code>, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of <code>READ_UNCOMMITTED<code>. This option applies only to writing. Please refer the documentation in <code>java.sql.Connection</code>.

http://git-wip-us.apache.org/repos/asf/spark/blob/07beb5d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 7f419b5..d416eec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -122,6 +122,11 @@ class JDBCOptions(
       case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
       case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
     }
+  // the maximum number of connections
+  val maxConnections = parameters.get(JDBC_MAX_CONNECTIONS).map(_.toInt)
+  require(maxConnections.isEmpty || maxConnections.get > 0,
+    s"Invalid value `${maxConnections.get}` for parameter `$JDBC_MAX_CONNECTIONS`. " +
+      "The minimum value is 1.")
 }
 
 object JDBCOptions {
@@ -144,4 +149,5 @@ object JDBCOptions {
   val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
   val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
+  val JDBC_MAX_CONNECTIONS = newOption("maxConnections")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/07beb5d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 41edb65..cdc3c99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
     val getConnection: () => Connection = createConnectionFactory(options)
     val batchSize = options.batchSize
     val isolationLevel = options.isolationLevel
-    df.foreachPartition(iterator => savePartition(
+    val maxConnections = options.maxConnections
+    val repartitionedDF =
+      if (maxConnections.isDefined && maxConnections.get < df.rdd.getNumPartitions) {
+        df.coalesce(maxConnections.get)
+      } else {
+        df
+      }
+    repartitionedDF.foreachPartition(iterator => savePartition(
       getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/07beb5d2/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index e3d3c6c..5795b4d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -312,4 +312,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
       .options(properties.asScala)
       .save()
   }
+
+  test("SPARK-18413: Add `maxConnections` JDBCOption") {
+    val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+    val e = intercept[IllegalArgumentException] {
+      df.write.format("jdbc")
+        .option("dbtable", "TEST.SAVETEST")
+        .option("url", url1)
+        .option(s"${JDBCOptions.JDBC_MAX_CONNECTIONS}", "0")
+        .save()
+    }.getMessage
+    assert(e.contains("Invalid value `0` for parameter `maxConnections`. The minimum value is 1"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org