You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/02 13:48:41 UTC

spark git commit: [SPARK-18419][SQL] `JDBCRelation.insert` should not remove Spark options

Repository: spark
Updated Branches:
  refs/heads/master 294163ee9 -> 55d528f2b


[SPARK-18419][SQL] `JDBCRelation.insert` should not remove Spark options

## What changes were proposed in this pull request?

Currently, `JDBCRelation.insert` removes Spark options too early by mistakenly using `asConnectionProperties`. Spark options like `numPartitions` should be passed into `DataFrameWriter.jdbc` correctly. This bug have been **hidden** because `JDBCOptions.asConnectionProperties` fails to filter out the mixed-case options. This PR aims to fix both.

**JDBCRelation.insert**
```scala
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
  val url = jdbcOptions.url
  val table = jdbcOptions.table
- val properties = jdbcOptions.asConnectionProperties
+ val properties = jdbcOptions.asProperties
  data.write
    .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
    .jdbc(url, table, properties)
```

**JDBCOptions.asConnectionProperties**
```scala
scala> import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
scala> import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
scala> new JDBCOptions(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10")).asConnectionProperties
res0: java.util.Properties = {numpartitions=10}
scala> new JDBCOptions(new CaseInsensitiveMap(Map("url" -> "jdbc:mysql://localhost:3306/temp", "dbtable" -> "t1", "numPartitions" -> "10"))).asConnectionProperties
res1: java.util.Properties = {numpartitions=10}
```

## How was this patch tested?

Pass the Jenkins with a new testcase.

Author: Dongjoon Hyun <do...@apache.org>

Closes #15863 from dongjoon-hyun/SPARK-18419.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55d528f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55d528f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55d528f2

Branch: refs/heads/master
Commit: 55d528f2ba0ba689dbb881616d9436dc7958e943
Parents: 294163e
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Dec 2 21:48:22 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Dec 2 21:48:22 2016 +0800

----------------------------------------------------------------------
 .../datasources/jdbc/JDBCOptions.scala          | 23 +++++++++++++++-----
 .../execution/datasources/jdbc/JDBCRDD.scala    |  1 -
 .../datasources/jdbc/JDBCRelation.scala         |  2 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 10 +++++++++
 4 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/55d528f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 56cd178..6fd2e0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.jdbc
 import java.sql.{Connection, DriverManager}
 import java.util.Properties
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 
 /**
@@ -41,10 +39,23 @@ class JDBCOptions(
       JDBCOptions.JDBC_TABLE_NAME -> table)))
   }
 
+  /**
+   * Returns a property with all options.
+   */
+  val asProperties: Properties = {
+    val properties = new Properties()
+    parameters.foreach { case (k, v) => properties.setProperty(k, v) }
+    properties
+  }
+
+  /**
+   * Returns a property with all options except Spark internal data source options like `url`,
+   * `dbtable`, and `numPartition`. This should be used when invoking JDBC API like `Driver.connect`
+   * because each DBMS vendor has its own property list for JDBC driver. See SPARK-17776.
+   */
   val asConnectionProperties: Properties = {
     val properties = new Properties()
-    // We should avoid to pass the options into properties. See SPARK-17776.
-    parameters.filterKeys(!jdbcOptionNames.contains(_))
+    parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase))
       .foreach { case (k, v) => properties.setProperty(k, v) }
     properties
   }
@@ -126,10 +137,10 @@ class JDBCOptions(
 }
 
 object JDBCOptions {
-  private val jdbcOptionNames = ArrayBuffer.empty[String]
+  private val jdbcOptionNames = collection.mutable.Set[String]()
 
   private def newOption(name: String): String = {
-    jdbcOptionNames += name
+    jdbcOptionNames += name.toLowerCase
     name
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/55d528f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 37df283..d5b11e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -54,7 +54,6 @@ object JDBCRDD extends Logging {
   def resolveTable(options: JDBCOptions): StructType = {
     val url = options.url
     val table = options.table
-    val properties = options.asConnectionProperties
     val dialect = JdbcDialects.get(url)
     val conn: Connection = JdbcUtils.createConnectionFactory(options)()
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/55d528f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 5ca1c75..8b45dba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -131,7 +131,7 @@ private[sql] case class JDBCRelation(
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {
     val url = jdbcOptions.url
     val table = jdbcOptions.table
-    val properties = jdbcOptions.asConnectionProperties
+    val properties = jdbcOptions.asProperties
     data.write
       .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
       .jdbc(url, table, properties)

http://git-wip-us.apache.org/repos/asf/spark/blob/55d528f2/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 7f312db..74ca66b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -890,4 +891,13 @@ class JDBCSuite extends SparkFunSuite
     assert(sql("SELECT * FROM mixedCaseCols WHERE Id = 1 OR Name = 'mary'").collect().size == 2)
     assert(sql("SELECT * FROM mixedCaseCols WHERE Name = 'mary' AND Id = 2").collect().size == 1)
   }
+
+  test("SPARK-18419: Fix `asConnectionProperties` to filter case-insensitively") {
+    val parameters = Map(
+      "url" -> "jdbc:mysql://localhost:3306/temp",
+      "dbtable" -> "t1",
+      "numPartitions" -> "10")
+    assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
+    assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org