You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/02/28 18:49:31 UTC

spark git commit: [SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS

Repository: spark
Updated Branches:
  refs/heads/master 7c7fc30b4 -> 9734a928a


[SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS

## What changes were proposed in this pull request?

HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html) of `rename()`, the behavior of the local filesystem and HDFS varies:

> Destination exists and is a file
> Renaming a file atop an existing file is specified as failing, raising an exception.
>    - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
>    - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.

This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.

## How was this patch tested?

This patch was tested by running `StateStoreSuite`.

Author: Roberto Agostino Vitillo <ra...@gmail.com>

Closes #17012 from vitillo/fix_rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9734a928
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9734a928
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9734a928

Branch: refs/heads/master
Commit: 9734a928a75d29ea202e9f309f92ca4637d35671
Parents: 7c7fc30
Author: Roberto Agostino Vitillo <ra...@gmail.com>
Authored: Tue Feb 28 10:49:07 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Feb 28 10:49:07 2017 -0800

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 11 ++++++-
 .../streaming/state/StateStoreSuite.scala       | 31 +++++++++++++++-----
 2 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9734a928/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 61eb601..2d29940 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -274,7 +274,16 @@ private[state] class HDFSBackedStateStoreProvider(
   private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
     synchronized {
       val finalDeltaFile = deltaFile(newVersion)
-      if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
+
+      // scalastyle:off
+      // Renaming a file atop an existing one fails on HDFS
+      // (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
+      // Hence we should either skip the rename step or delete the target file. Because deleting the
+      // target file will break speculation, skipping the rename step is the only choice. It's still
+      // semantically correct because Structured Streaming requires rerunning a batch should
+      // generate the same output. (SPARK-19677)
+      // scalastyle:on
+      if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
         throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
       }
       loadedMaps.put(newVersion, map)

http://git-wip-us.apache.org/repos/asf/spark/blob/9734a928/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 6b38b6a..dc4e935 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     assert(store1.commit() === 2)
     assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
     assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))
-
-    // Overwrite the version with other data
-    val store2 = provider.getStore(1)
-    put(store2, "c", 1)
-    assert(store2.commit() === 2)
-    assert(rowsToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1))
-    assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1))
   }
 
   test("snapshotting") {
@@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
   }
 
+  test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
+    val conf = new Configuration()
+    conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
+    conf.set("fs.default.name", "fake:///")
+
+    val provider = newStoreProvider(hadoopConf = conf)
+    provider.getStore(0).commit()
+    provider.getStore(0).commit()
+  }
 
   test("corrupted file handling") {
     val provider = newStoreProvider(minDeltasForSnapshot = 5)
@@ -682,6 +684,21 @@ private[state] object StateStoreSuite {
 }
 
 /**
+ * Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing
+ * one should return false.
+ * See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
+ */
+class RenameLikeHDFSFileSystem extends RawLocalFileSystem {
+  override def rename(src: Path, dst: Path): Boolean = {
+    if (exists(dst)) {
+      return false
+    } else {
+      return super.rename(src, dst)
+    }
+  }
+}
+
+/**
  * Fake FileSystem to test that the StateStore throws an exception while committing the
  * delta file, when `fs.rename` returns `false`.
  */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org