You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "xuchuanyin (JIRA)" <ji...@apache.org> on 2017/11/13 07:22:00 UTC

[jira] [Created] (SPARK-22504) Optimization in overwrite table in case of failure

xuchuanyin created SPARK-22504:
----------------------------------

             Summary: Optimization in overwrite table in case of failure
                 Key: SPARK-22504
                 URL: https://issues.apache.org/jira/browse/SPARK-22504
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.1.0
            Reporter: xuchuanyin


Optimization in overwrite table in case of failure

# SCENARIO

Currently, `Overwrite` operation in spark is performed by following steps: 
1. DROP : drop old table
2. WRITE: create and write data into new table

If some runtime error occurs in Step2, then the origin table will be lost along with its data -- I think this will be a serious problem if someone perform `read-update-flushback` actions. The problem can be reproduced by the following code:

```scala

01: test("test spark df overwrite failed") {
02:     // prepare table
03:     val tableName = "test_spark_overwrite_failed"
04:     sql(s"DROP TABLE IF EXISTS $tableName")
05:     sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, field_string String)" +
06:         s" STORED AS parquet").collect()
07: 
08:     // load data first
09:     val schema = StructType(
10:       Seq(StructField("field_int", DataTypes.IntegerType, nullable = false),
11:         StructField("field_string", DataTypes.StringType, nullable = false)))
12:     val rdd1 = sqlContext.sparkContext.parallelize(
13:       Row(20, "q") ::
14:       Row(21, "qw") ::
15:       Row(23, "qwe") :: Nil)
16:     val dataFrame = sqlContext.createDataFrame(rdd1, schema)
17:     dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
18:     sql(s"SELECT * FROM $tableName").show()
19: 
20:     // load data again, the following data will cause failure in data loading
21:     try {
22:       val rdd2 = sqlContext.sparkContext.parallelize(
23:         Row(31, "qwer") ::
24:         Row(null, "qwer") ::
25:         Row(32, "long_than_5") :: Nil)
26:       val dataFrame2 = sqlContext.createDataFrame(rdd2, schema)
27: 
28:       dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
29:     } catch {
30:       case e: Exception => LOGGER.error(e, "write overwrite failure")
31:     }
32:     // table `test_spark_overwrite_failed` has been dropped
33:     sql(s"show tables").show(20, truncate = false)
34:     // the content is empty even if table exists. We want it to be the same as 
35:     sql(s"SELECT * FROM $tableName").show()
36:   }

```
In Line24, we creata a `null` element while the schema is `notnull` -- This will cause runtime error in loading data.
In Line33, table `test_spark_overwrite_failed` has already been dropped and no longger exists in the current table. And of course Line35 will fail.
Instead, we want Line35 to show the origin data just as Line18.

# ANALYZE

I am thinking of optimizing `overwrite` in spark -- The goal is to keep the old data until the load has finished successfully. The old data can only be cleaned when the load is successful.

Since sparksql already support `rename` operation, we can optimize `overwrite` in the following steps:
1. WRITE: create and write data to tempTable
2. SWAP: swap temptable1 with targetTable by using rename operation
3. CLEAN: clean up old data

If step1 works fine, then swap tempTable with targetTable and clean up old data; otherwise, keep the target table not changed.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org