You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/10/07 17:52:44 UTC

spark git commit: [SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider

Repository: spark
Updated Branches:
  refs/heads/master cff560755 -> aa3a6841e


[SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider

## What changes were proposed in this pull request?

This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940

Also, this PR proposes the changes below:

 - Add documentation for `batchsize` and `isolationLevel`.
 - Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gu...@gmail.com>

Closes #15263 from HyukjinKwon/SPARK-14525.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa3a6841
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa3a6841
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa3a6841

Branch: refs/heads/master
Commit: aa3a6841ebaf45efb5d3930a93869948bdd0d2b6
Parents: cff5607
Author: hyukjinkwon <gu...@gmail.com>
Authored: Fri Oct 7 10:52:32 2016 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 7 10:52:32 2016 -0700

----------------------------------------------------------------------
 .../datasources/jdbc/JdbcRelationProvider.scala | 82 ++++++++------------
 .../execution/datasources/jdbc/JdbcUtils.scala  | 29 ++++++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  2 +-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 13 ++++
 4 files changed, 74 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index ae04af2..3a8a197 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
 
 class JdbcRelationProvider extends CreatableRelationProvider
@@ -50,67 +51,52 @@ class JdbcRelationProvider extends CreatableRelationProvider
     JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
   }
 
-  /*
-   * The following structure applies to this code:
-   *                 |    tableExists            |          !tableExists
-   *------------------------------------------------------------------------------------
-   * Ignore          | BaseRelation              | CreateTable, saveTable, BaseRelation
-   * ErrorIfExists   | ERROR                     | CreateTable, saveTable, BaseRelation
-   * Overwrite*      | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation
-   *                 | saveTable, BaseRelation   |
-   * Append          | saveTable, BaseRelation   | CreateTable, saveTable, BaseRelation
-   *
-   * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate
-   */
   override def createRelation(
       sqlContext: SQLContext,
       mode: SaveMode,
       parameters: Map[String, String],
-      data: DataFrame): BaseRelation = {
-    val jdbcOptions = new JDBCOptions(parameters)
-    val url = jdbcOptions.url
-    val table = jdbcOptions.table
-
+      df: DataFrame): BaseRelation = {
+    val options = new JDBCOptions(parameters)
+    val url = options.url
+    val table = options.table
+    val createTableOptions = options.createTableOptions
+    val isTruncate = options.isTruncate
     val props = new Properties()
     props.putAll(parameters.asJava)
-    val conn = JdbcUtils.createConnectionFactory(url, props)()
 
+    val conn = JdbcUtils.createConnectionFactory(url, props)()
     try {
       val tableExists = JdbcUtils.tableExists(conn, url, table)
+      if (tableExists) {
+        mode match {
+          case SaveMode.Overwrite =>
+            if (isTruncate && isCascadingTruncateTable(url).contains(false)) {
+              // In this case, we should truncate table and then load.
+              truncateTable(conn, table)
+              saveTable(df, url, table, props)
+            } else {
+              // Otherwise, do not truncate the table, instead drop and recreate it
+              dropTable(conn, table)
+              createTable(df.schema, url, table, createTableOptions, conn)
+              saveTable(df, url, table, props)
+            }
 
-      val (doCreate, doSave) = (mode, tableExists) match {
-        case (SaveMode.Ignore, true) => (false, false)
-        case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(
-          s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.")
-        case (SaveMode.Overwrite, true) =>
-          if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
-            JdbcUtils.truncateTable(conn, table)
-            (false, true)
-          } else {
-            JdbcUtils.dropTable(conn, table)
-            (true, true)
-          }
-        case (SaveMode.Append, true) => (false, true)
-        case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," +
-          " for handling existing tables.")
-        case (_, false) => (true, true)
-      }
+          case SaveMode.Append =>
+            saveTable(df, url, table, props)
+
+          case SaveMode.ErrorIfExists =>
+            throw new AnalysisException(
+              s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")
 
-      if (doCreate) {
-        val schema = JdbcUtils.schemaString(data, url)
-        // To allow certain options to append when create a new table, which can be
-        // table_options or partition_options.
-        // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
-        val createtblOptions = jdbcOptions.createTableOptions
-        val sql = s"CREATE TABLE $table ($schema) $createtblOptions"
-        val statement = conn.createStatement
-        try {
-          statement.executeUpdate(sql)
-        } finally {
-          statement.close()
+          case SaveMode.Ignore =>
+            // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
+            // to not save the contents of the DataFrame and to not change the existing data.
+            // Therefore, it is okay to do nothing here and then just return the relation below.
         }
+      } else {
+        createTable(df.schema, url, table, createTableOptions, conn)
+        saveTable(df, url, table, props)
       }
-      if (doSave) JdbcUtils.saveTable(data, url, table, props)
     } finally {
       conn.close()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 3db1d1f..66f2bad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -552,7 +552,7 @@ object JdbcUtils extends Logging {
       isolationLevel: Int): Iterator[Byte] = {
     require(batchSize >= 1,
       s"Invalid value `${batchSize.toString}` for parameter " +
-      s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.")
+      s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.")
 
     val conn = getConnection()
     var committed = false
@@ -657,10 +657,10 @@ object JdbcUtils extends Logging {
   /**
    * Compute the schema string for this RDD.
    */
-  def schemaString(df: DataFrame, url: String): String = {
+  def schemaString(schema: StructType, url: String): String = {
     val sb = new StringBuilder()
     val dialect = JdbcDialects.get(url)
-    df.schema.fields foreach { field =>
+    schema.fields foreach { field =>
       val name = dialect.quoteIdentifier(field.name)
       val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
       val nullable = if (field.nullable) "" else "NOT NULL"
@@ -697,4 +697,27 @@ object JdbcUtils extends Logging {
       getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
     )
   }
+
+  /**
+   * Creates a table with a given schema.
+   */
+  def createTable(
+      schema: StructType,
+      url: String,
+      table: String,
+      createTableOptions: String,
+      conn: Connection): Unit = {
+    val strSchema = schemaString(schema, url)
+    // Create the table if the table does not exist.
+    // To allow certain options to append when create a new table, which can be
+    // table_options or partition_options.
+    // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+    val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
+    val statement = conn.createStatement
+    try {
+      statement.executeUpdate(sql)
+    } finally {
+      statement.close()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 10f15ca..7cc3989 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -788,7 +788,7 @@ class JDBCSuite extends SparkFunSuite
 
   test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
     val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
-    val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
+    val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp")
     assert(schema.contains("`order` TEXT"))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 5069713..62b29db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -132,6 +132,19 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     }
   }
 
+  test("CREATE with ignore") {
+    val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
+    val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
+
+    df.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
+    assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
+    assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+
+    df2.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
+    assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
+    assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+  }
+
   test("CREATE with overwrite") {
     val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
     val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org