You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/10/07 17:52:44 UTC
spark git commit: [SPARK-14525][SQL][FOLLOWUP] Clean up
JdbcRelationProvider
Repository: spark
Updated Branches:
refs/heads/master cff560755 -> aa3a6841e
[SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider
## What changes were proposed in this pull request?
This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940
Also, this PR proposes the changes below:
- Add documentation for `batchsize` and `isolationLevel`.
- Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`.
## How was this patch tested?
Existing tests should cover this.
Author: hyukjinkwon <gu...@gmail.com>
Closes #15263 from HyukjinKwon/SPARK-14525.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa3a6841
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa3a6841
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa3a6841
Branch: refs/heads/master
Commit: aa3a6841ebaf45efb5d3930a93869948bdd0d2b6
Parents: cff5607
Author: hyukjinkwon <gu...@gmail.com>
Authored: Fri Oct 7 10:52:32 2016 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 7 10:52:32 2016 -0700
----------------------------------------------------------------------
.../datasources/jdbc/JdbcRelationProvider.scala | 82 ++++++++------------
.../execution/datasources/jdbc/JdbcUtils.scala | 29 ++++++-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +-
.../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 13 ++++
4 files changed, 74 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
index ae04af2..3a8a197 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
@@ -22,6 +22,7 @@ import java.util.Properties
import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
class JdbcRelationProvider extends CreatableRelationProvider
@@ -50,67 +51,52 @@ class JdbcRelationProvider extends CreatableRelationProvider
JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
}
- /*
- * The following structure applies to this code:
- * | tableExists | !tableExists
- *------------------------------------------------------------------------------------
- * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation
- * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation
- * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation
- * | saveTable, BaseRelation |
- * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation
- *
- * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate
- */
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- val jdbcOptions = new JDBCOptions(parameters)
- val url = jdbcOptions.url
- val table = jdbcOptions.table
-
+ df: DataFrame): BaseRelation = {
+ val options = new JDBCOptions(parameters)
+ val url = options.url
+ val table = options.table
+ val createTableOptions = options.createTableOptions
+ val isTruncate = options.isTruncate
val props = new Properties()
props.putAll(parameters.asJava)
- val conn = JdbcUtils.createConnectionFactory(url, props)()
+ val conn = JdbcUtils.createConnectionFactory(url, props)()
try {
val tableExists = JdbcUtils.tableExists(conn, url, table)
+ if (tableExists) {
+ mode match {
+ case SaveMode.Overwrite =>
+ if (isTruncate && isCascadingTruncateTable(url).contains(false)) {
+ // In this case, we should truncate table and then load.
+ truncateTable(conn, table)
+ saveTable(df, url, table, props)
+ } else {
+ // Otherwise, do not truncate the table, instead drop and recreate it
+ dropTable(conn, table)
+ createTable(df.schema, url, table, createTableOptions, conn)
+ saveTable(df, url, table, props)
+ }
- val (doCreate, doSave) = (mode, tableExists) match {
- case (SaveMode.Ignore, true) => (false, false)
- case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(
- s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.")
- case (SaveMode.Overwrite, true) =>
- if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
- JdbcUtils.truncateTable(conn, table)
- (false, true)
- } else {
- JdbcUtils.dropTable(conn, table)
- (true, true)
- }
- case (SaveMode.Append, true) => (false, true)
- case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," +
- " for handling existing tables.")
- case (_, false) => (true, true)
- }
+ case SaveMode.Append =>
+ saveTable(df, url, table, props)
+
+ case SaveMode.ErrorIfExists =>
+ throw new AnalysisException(
+ s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")
- if (doCreate) {
- val schema = JdbcUtils.schemaString(data, url)
- // To allow certain options to append when create a new table, which can be
- // table_options or partition_options.
- // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
- val createtblOptions = jdbcOptions.createTableOptions
- val sql = s"CREATE TABLE $table ($schema) $createtblOptions"
- val statement = conn.createStatement
- try {
- statement.executeUpdate(sql)
- } finally {
- statement.close()
+ case SaveMode.Ignore =>
+ // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
+ // to not save the contents of the DataFrame and to not change the existing data.
+ // Therefore, it is okay to do nothing here and then just return the relation below.
}
+ } else {
+ createTable(df.schema, url, table, createTableOptions, conn)
+ saveTable(df, url, table, props)
}
- if (doSave) JdbcUtils.saveTable(data, url, table, props)
} finally {
conn.close()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 3db1d1f..66f2bad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -552,7 +552,7 @@ object JdbcUtils extends Logging {
isolationLevel: Int): Iterator[Byte] = {
require(batchSize >= 1,
s"Invalid value `${batchSize.toString}` for parameter " +
- s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.")
+ s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.")
val conn = getConnection()
var committed = false
@@ -657,10 +657,10 @@ object JdbcUtils extends Logging {
/**
* Compute the schema string for this RDD.
*/
- def schemaString(df: DataFrame, url: String): String = {
+ def schemaString(schema: StructType, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
- df.schema.fields foreach { field =>
+ schema.fields foreach { field =>
val name = dialect.quoteIdentifier(field.name)
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
val nullable = if (field.nullable) "" else "NOT NULL"
@@ -697,4 +697,27 @@ object JdbcUtils extends Logging {
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
+
+ /**
+ * Creates a table with a given schema.
+ */
+ def createTable(
+ schema: StructType,
+ url: String,
+ table: String,
+ createTableOptions: String,
+ conn: Connection): Unit = {
+ val strSchema = schemaString(schema, url)
+ // Create the table if the table does not exist.
+ // To allow certain options to append when create a new table, which can be
+ // table_options or partition_options.
+ // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
+ val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
+ val statement = conn.createStatement
+ try {
+ statement.executeUpdate(sql)
+ } finally {
+ statement.close()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 10f15ca..7cc3989 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -788,7 +788,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
- val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
+ val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp")
assert(schema.contains("`order` TEXT"))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a6841/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 5069713..62b29db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -132,6 +132,19 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
}
}
+ test("CREATE with ignore") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
+ val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
+
+ df.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
+ assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
+ assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+
+ df2.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
+ assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
+ assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+ }
+
test("CREATE with overwrite") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org