You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/13 05:46:13 UTC
spark git commit: [SPARK-23815][CORE] Spark writer dynamic partition
overwrite mode may fail to write output on multi level partition
Repository: spark
Updated Branches:
refs/heads/master 1018be44d -> 4b0703679
[SPARK-23815][CORE] Spark writer dynamic partition overwrite mode may fail to write output on multi level partition
## What changes were proposed in this pull request?
Spark introduced new writer mode to overwrite only related partitions in SPARK-20236. While we are using this feature in our production cluster, we found a bug when writing multi-level partitions on HDFS.
A simple test case to reproduce this issue:
val df = Seq(("1","2","3")).toDF("col1", "col2","col3")
df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location")
If HDFS location "/my/hdfs/location" does not exist, there will be no output.
This seems to be caused by the job commit change in SPARK-20236 in HadoopMapReduceCommitProtocol.
In the commit job process, the output has been written into staging dir /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to /my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not create directory for more than one level.
This does not happen in the new unit test added with SPARK-20236 which uses local file system.
We are proposing a fix. When cleaning current partition dir /my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails (because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not exist) so the following rename op can succeed.
Reference: in official HDFS document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html), the rename command has precondition "dest must be root, or have a parent that exists"
## How was this patch tested?
We have tested this patch on our production cluster and it fixed the problem
Author: Fangshi Li <fl...@linkedin.com>
Closes #20931 from fangshil/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b070367
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b070367
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b070367
Branch: refs/heads/master
Commit: 4b07036799b01894826b47c73142fe282c607a57
Parents: 1018be4
Author: Fangshi Li <fl...@linkedin.com>
Authored: Fri Apr 13 13:46:34 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 13 13:46:34 2018 +0800
----------------------------------------------------------------------
.../internal/io/HadoopMapReduceCommitProtocol.scala | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4b070367/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 6d20ef1..3e60c50 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -186,7 +186,17 @@ class HadoopMapReduceCommitProtocol(
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
- fs.delete(finalPartPath, true)
+ if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) {
+ // According to the official hadoop FileSystem API spec, delete op should assume
+ // the destination is no longer present regardless of return value, thus we do not
+ // need to double check if finalPartPath exists before rename.
+ // Also in our case, based on the spec, delete returns false only when finalPartPath
+ // does not exist. When this happens, we need to take action if parent of finalPartPath
+ // also does not exist(e.g. the scenario described on SPARK-23815), because
+ // FileSystem API spec on rename op says the rename dest(finalPartPath) must have
+ // a parent that exists, otherwise we may get unexpected result on the rename.
+ fs.mkdirs(finalPartPath.getParent)
+ }
fs.rename(new Path(stagingDir, part), finalPartPath)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org