You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/08 23:08:13 UTC

spark git commit: [SPARK-18342] Make rename failures fatal in HDFSBackedStateStore

Repository: spark
Updated Branches:
  refs/heads/master b6de0c98c -> 6f7ecb0f2


[SPARK-18342] Make rename failures fatal in HDFSBackedStateStore

## What changes were proposed in this pull request?

If the rename operation in the state store fails (`fs.rename` returns `false`), the StateStore should throw an exception and have the task retry. Currently if renames fail, nothing happens during execution immediately. However, you will observe that snapshot operations will fail, and then any attempt at recovery (executor failure / checkpoint recovery) also fails.

## How was this patch tested?

Unit test

Author: Burak Yavuz <br...@gmail.com>

Closes #15804 from brkyvz/rename-state.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f7ecb0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f7ecb0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f7ecb0f

Branch: refs/heads/master
Commit: 6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb
Parents: b6de0c9
Author: Burak Yavuz <br...@gmail.com>
Authored: Tue Nov 8 15:08:09 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Nov 8 15:08:09 2016 -0800

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    |  6 ++-
 .../streaming/state/StateStoreSuite.scala       | 41 +++++++++++++++++---
 2 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f7ecb0f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index f1e7f1d..8087131 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider(
   private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
     synchronized {
       val finalDeltaFile = deltaFile(newVersion)
-      fs.rename(tempDeltaFile, finalDeltaFile)
+      if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
+        throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
+      }
       loadedMaps.put(newVersion, map)
       finalDeltaFile
     }
@@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider(
 
         val deltaFiles = allFiles.filter { file =>
           file.version > snapshotFile.version && file.version <= version
-        }
+        }.toList
         verify(
           deltaFiles.size == version - snapshotFile.version,
           s"Unexpected list of delta files for version $version for $this: $deltaFiles"

http://git-wip-us.apache.org/repos/asf/spark/blob/6f7ecb0f/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index fcf300b..504a265 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File, IOException}
+import java.net.URI
 
 import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
@@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     }
   }
 
+  test("SPARK-18342: commit fails when rename fails") {
+    import RenameReturnsFalseFileSystem._
+    val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString
+    val conf = new Configuration()
+    conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
+    val provider = newStoreProvider(dir = dir, hadoopConf = conf)
+    val store = provider.getStore(0)
+    put(store, "a", 0)
+    val e = intercept[IllegalStateException](store.commit())
+    assert(e.getCause.getMessage.contains("Failed to rename"))
+  }
+
   def getDataFromFiles(
       provider: HDFSBackedStateStoreProvider,
     version: Int = -1): Set[(String, Int)] = {
@@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
   def newStoreProvider(
       opId: Long = Random.nextLong,
       partition: Int = 0,
-      minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get
+      minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+      dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString,
+      hadoopConf: Configuration = new Configuration()
     ): HDFSBackedStateStoreProvider = {
-    val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
     val sqlConf = new SQLConf()
     sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
     new HDFSBackedStateStoreProvider(
@@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
       keySchema,
       valueSchema,
       new StateStoreConf(sqlConf),
-      new Configuration())
+      hadoopConf)
   }
 
   def remove(store: StateStore, condition: String => Boolean): Unit = {
@@ -598,3 +612,20 @@ private[state] object StateStoreSuite {
     }}.toSet
   }
 }
+
+/**
+ * Fake FileSystem to test that the StateStore throws an exception while committing the
+ * delta file, when `fs.rename` returns `false`.
+ */
+class RenameReturnsFalseFileSystem extends RawLocalFileSystem {
+  import RenameReturnsFalseFileSystem._
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def rename(src: Path, dst: Path): Boolean = false
+}
+
+object RenameReturnsFalseFileSystem {
+  val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org