You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "xuchuanyin (JIRA)" <ji...@apache.org> on 2017/11/13 07:22:00 UTC
[jira] [Created] (SPARK-22504) Optimization in overwrite table in
case of failure
xuchuanyin created SPARK-22504:
----------------------------------
Summary: Optimization in overwrite table in case of failure
Key: SPARK-22504
URL: https://issues.apache.org/jira/browse/SPARK-22504
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 2.1.0
Reporter: xuchuanyin
Optimization in overwrite table in case of failure
# SCENARIO
Currently, `Overwrite` operation in spark is performed by following steps:
1. DROP : drop old table
2. WRITE: create and write data into new table
If some runtime error occurs in Step2, then the origin table will be lost along with its data -- I think this will be a serious problem if someone perform `read-update-flushback` actions. The problem can be reproduced by the following code:
```scala
01: test("test spark df overwrite failed") {
02: // prepare table
03: val tableName = "test_spark_overwrite_failed"
04: sql(s"DROP TABLE IF EXISTS $tableName")
05: sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int, field_string String)" +
06: s" STORED AS parquet").collect()
07:
08: // load data first
09: val schema = StructType(
10: Seq(StructField("field_int", DataTypes.IntegerType, nullable = false),
11: StructField("field_string", DataTypes.StringType, nullable = false)))
12: val rdd1 = sqlContext.sparkContext.parallelize(
13: Row(20, "q") ::
14: Row(21, "qw") ::
15: Row(23, "qwe") :: Nil)
16: val dataFrame = sqlContext.createDataFrame(rdd1, schema)
17: dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
18: sql(s"SELECT * FROM $tableName").show()
19:
20: // load data again, the following data will cause failure in data loading
21: try {
22: val rdd2 = sqlContext.sparkContext.parallelize(
23: Row(31, "qwer") ::
24: Row(null, "qwer") ::
25: Row(32, "long_than_5") :: Nil)
26: val dataFrame2 = sqlContext.createDataFrame(rdd2, schema)
27:
28: dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
29: } catch {
30: case e: Exception => LOGGER.error(e, "write overwrite failure")
31: }
32: // table `test_spark_overwrite_failed` has been dropped
33: sql(s"show tables").show(20, truncate = false)
34: // the content is empty even if table exists. We want it to be the same as
35: sql(s"SELECT * FROM $tableName").show()
36: }
```
In Line24, we creata a `null` element while the schema is `notnull` -- This will cause runtime error in loading data.
In Line33, table `test_spark_overwrite_failed` has already been dropped and no longger exists in the current table. And of course Line35 will fail.
Instead, we want Line35 to show the origin data just as Line18.
# ANALYZE
I am thinking of optimizing `overwrite` in spark -- The goal is to keep the old data until the load has finished successfully. The old data can only be cleaned when the load is successful.
Since sparksql already support `rename` operation, we can optimize `overwrite` in the following steps:
1. WRITE: create and write data to tempTable
2. SWAP: swap temptable1 with targetTable by using rename operation
3. CLEAN: clean up old data
If step1 works fine, then swap tempTable with targetTable and clean up old data; otherwise, keep the target table not changed.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org