You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/19 04:58:38 UTC

spark git commit: [SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when creating a new table

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ee4e8faff -> 9fc053c30


[SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when creating a new table

### What changes were proposed in this pull request?
This PR is to backport the PRs https://github.com/apache/spark/pull/14559 and https://github.com/apache/spark/pull/14683

---
In the PR, we just allow the user to add additional options when create a new table in JDBC writer.
The options can be table_options or partition_options.
E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"

Here is the usage example:
```
df.write.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8").jdbc(...)
```
### How was this patch tested?
Added a test case.

Author: gatorsmile <ga...@gmail.com>

Closes #16634 from gatorsmile/backportSPARK-16968.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fc053c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fc053c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fc053c3

Branch: refs/heads/branch-2.0
Commit: 9fc053c30ae3670a1bcacacf33838750ddaca676
Parents: ee4e8fa
Author: GraceH <jh...@paypal.com>
Authored: Wed Jan 18 20:58:31 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 18 20:58:31 2017 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  7 ++++++
 .../org/apache/spark/sql/DataFrameWriter.scala  | 24 ++++++++++++++------
 .../datasources/jdbc/JDBCOptions.scala          | 17 +++++++++++++-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 10 ++++++++
 4 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d38aed1..274310f 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1023,6 +1023,13 @@ the Data Sources API. The following options are supported:
       The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
     </td>
   </tr>
+
+  <tr>
+    <td><code>createTableOptions</code></td>
+    <td>
+      This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table. For example: <code>CREATE TABLE t (name string) ENGINE=InnoDB.</code>
+   </td>
+  </tr>
 </table>
 
 <div class="codetabs">

http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a4c4a5d..3cad7df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
 import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 
 /**
  * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -399,6 +399,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     assertNotPartitioned("jdbc")
     assertNotBucketed("jdbc")
 
+    // to add required options like URL and dbtable
+    val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
+    val jdbcOptions = new JDBCOptions(params)
+    val jdbcUrl = jdbcOptions.url
+    val jdbcTable = jdbcOptions.table
+
     val props = new Properties()
     extraOptions.foreach { case (key, value) =>
       props.put(key, value)
@@ -408,25 +414,29 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     val conn = JdbcUtils.createConnectionFactory(url, props)()
 
     try {
-      var tableExists = JdbcUtils.tableExists(conn, url, table)
+      var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)
 
       if (mode == SaveMode.Ignore && tableExists) {
         return
       }
 
       if (mode == SaveMode.ErrorIfExists && tableExists) {
-        sys.error(s"Table $table already exists.")
+        sys.error(s"Table $jdbcTable already exists.")
       }
 
       if (mode == SaveMode.Overwrite && tableExists) {
-        JdbcUtils.dropTable(conn, table)
+        JdbcUtils.dropTable(conn, jdbcTable)
         tableExists = false
       }
 
       // Create the table if the table didn't exist.
       if (!tableExists) {
-        val schema = JdbcUtils.schemaString(df, url)
-        val sql = s"CREATE TABLE $table ($schema)"
+        val schema = JdbcUtils.schemaString(df, jdbcUrl)
+        // To allow certain options to append when create a new table, which can be
+        // table_options or partition_options.
+        // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+        val createtblOptions = jdbcOptions.createTableOptions
+        val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions"
         val statement = conn.createStatement
         try {
           statement.executeUpdate(sql)
@@ -438,7 +448,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       conn.close()
     }
 
-    JdbcUtils.saveTable(df, url, table, props)
+    JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 6c6ec89..a9f156d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -20,14 +20,21 @@ package org.apache.spark.sql.execution.datasources.jdbc
 /**
  * Options for the JDBC data source.
  */
-private[jdbc] class JDBCOptions(
+class JDBCOptions(
     @transient private val parameters: Map[String, String])
   extends Serializable {
 
+  // ------------------------------------------------------------
+  // Required parameters
+  // ------------------------------------------------------------
   // a JDBC URL
   val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
   // name of table
   val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
+
+  // ------------------------------------------------------------
+  // Optional parameter list
+  // ------------------------------------------------------------
   // the column used to partition
   val partitionColumn = parameters.getOrElse("partitionColumn", null)
   // the lower bound of partition column
@@ -36,4 +43,12 @@ private[jdbc] class JDBCOptions(
   val upperBound = parameters.getOrElse("upperBound", null)
   // the number of partitions
   val numPartitions = parameters.getOrElse("numPartitions", null)
+
+  // ------------------------------------------------------------
+  // The options for DataFrameWriter
+  // ------------------------------------------------------------
+  // the create table option , which can be table_options or partition_options.
+  // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+  // TODO: to reuse the existing partition parameters for those partition specific options
+  val createTableOptions = parameters.getOrElse("createTableOptions", "")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 2c6449f..a6e63e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -155,6 +155,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
   }
 
+  test("createTableOptions") {
+    val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+
+    val m = intercept[org.h2.jdbc.JdbcSQLException] {
+      df.write.option("createTableOptions", "ENGINE tableEngineName")
+        .jdbc(url1, "TEST.CREATETBLOPTS", properties)
+    }.getMessage
+    assert(m.contains("Class \"TABLEENGINENAME\" not found"))
+  }
+
   test("Incompatible INSERT to append") {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
     val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org