You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/03/17 08:47:00 UTC

[jira] [Updated] (SPARK-29185) Add new SaveMode types for Spark SQL jdbc datasource

     [ https://issues.apache.org/jira/browse/SPARK-29185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-29185:
----------------------------------
    Affects Version/s:     (was: 3.0.0)
                       3.1.0

> Add new SaveMode types for Spark SQL jdbc datasource
> ----------------------------------------------------
>
>                 Key: SPARK-29185
>                 URL: https://issues.apache.org/jira/browse/SPARK-29185
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Timothy Zhang
>            Priority: Major
>
>  It is necessary to add new SaveMode for Delete, Update, and Upsert, such as:
>  * SaveMode.Delete
>  * SaveMode.Update
>  * SaveMode.Upsert
> So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, MySQL etc. Actually code implementation of current SaveMode.Append types is very flexible. All types could share the same savePartition function, add only add new getStatement functions for Delete, Update, Upsert with SQL statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial implementations for them:
> {code:java}
> def getDeleteStatement(table: String, rddSchema: StructType, dialect: JdbcDialect): String = {
>     val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + "=?").mkString(" AND ")
>     s"DELETE FROM ${table.toUpperCase} WHERE $columns"
>   }
>   def getUpdateStatement(table: String, rddSchema: StructType, priKeys: Seq[String], dialect: JdbcDialect): String = {
>     val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
>     val priCols = priKeys.map(dialect.quoteIdentifier(_))
>     val columns = (fullCols diff priCols).map(_ + "=?").mkString(",")
>     val cnditns = priCols.map(_ + "=?").mkString(" AND ")
>     s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns"
>   }
>   def getMergeStatement(table: String, rddSchema: StructType, priKeys: Seq[String], dialect: JdbcDialect): String = {
>     val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
>     val priCols = priKeys.map(dialect.quoteIdentifier(_))
>     val nrmCols = fullCols diff priCols
>     val fullPart = fullCols.map(c => s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
>     val priPart = priCols.map(c => s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(" AND ")
>     val nrmPart = nrmCols.map(c => s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
>     val columns = fullCols.mkString(",")
>     val placeholders = fullCols.map(_ => "?").mkString(",")
>     s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " +
>       s"USING TABLE(VALUES($placeholders)) " +
>       s"AS ${dialect.quoteIdentifier("SRC")}($columns) " +
>       s"ON $priPart " +
>       s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " +
>       s"WHEN MATCHED THEN UPDATE SET $nrmPart"
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org