You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:15:09 UTC

[jira] [Resolved] (SPARK-22504) Optimization in overwrite table in case of failure

     [ https://issues.apache.org/jira/browse/SPARK-22504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22504.
----------------------------------
    Resolution: Incomplete

> Optimization in overwrite table in case of failure
> --------------------------------------------------
>
>                 Key: SPARK-22504
>                 URL: https://issues.apache.org/jira/browse/SPARK-22504
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: xuchuanyin
>            Priority: Major
>              Labels: bulk-closed
>
> Optimization in overwrite table in case of failure
> # SCENARIO
> Currently, `Overwrite` operation in spark is performed by following steps: 
> 1. DROP : drop old table
> 2. WRITE: create and write data into new table
> If some runtime error occurs in Step2, then the origin table will be lost along with its data -- I think this will be a serious problem if someone perform `read-update-flushback` actions. The problem can be reproduced by the following code:
> ```scala
> 01: test("test spark df overwrite failed") {
> 02:     // prepare table
> 03:     val tableName = "test_spark_overwrite_failed"
> 04:     sql(s"DROP TABLE IF EXISTS $tableName")
> 05:     sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, field_string String)" +
> 06:         s" STORED AS parquet").collect()
> 07: 
> 08:     // load data first
> 09:     val schema = StructType(
> 10:       Seq(StructField("field_int", DataTypes.IntegerType, nullable = false),
> 11:         StructField("field_string", DataTypes.StringType, nullable = false)))
> 12:     val rdd1 = sqlContext.sparkContext.parallelize(
> 13:       Row(20, "q") ::
> 14:       Row(21, "qw") ::
> 15:       Row(23, "qwe") :: Nil)
> 16:     val dataFrame = sqlContext.createDataFrame(rdd1, schema)
> 17:     dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 18:     sql(s"SELECT * FROM $tableName").show()
> 19: 
> 20:     // load data again, the following data will cause failure in data loading
> 21:     try {
> 22:       val rdd2 = sqlContext.sparkContext.parallelize(
> 23:         Row(31, "qwer") ::
> 24:         Row(null, "qwer") ::
> 25:         Row(32, "long_than_5") :: Nil)
> 26:       val dataFrame2 = sqlContext.createDataFrame(rdd2, schema)
> 27: 
> 28:       dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 29:     } catch {
> 30:       case e: Exception => LOGGER.error(e, "write overwrite failure")
> 31:     }
> 32:     // table `test_spark_overwrite_failed` has been dropped
> 33:     sql(s"show tables").show(20, truncate = false)
> 34:     // the content is empty even if table exists. We want it to be the same as 
> 35:     sql(s"SELECT * FROM $tableName").show()
> 36:   }
> ```
> In Line24, we creata a `null` element while the schema is `notnull` -- This will cause runtime error in loading data.
> In Line33, table `test_spark_overwrite_failed` has already been dropped and no longger exists in the current table. And of course Line35 will fail.
> Instead, we want Line35 to show the origin data just as Line18.
> # ANALYZE
> I am thinking of optimizing `overwrite` in spark -- The goal is to keep the old data until the load has finished successfully. The old data can only be cleaned when the load is successful.
> Since sparksql already support `rename` operation, we can optimize `overwrite` in the following steps:
> 1. WRITE: create and write data to tempTable
> 2. SWAP: swap temptable1 with targetTable by using rename operation
> 3. CLEAN: clean up old data
> If step1 works fine, then swap tempTable with targetTable and clean up old data; otherwise, keep the target table not changed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org