You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/14 18:03:04 UTC

spark git commit: [SPARK-18416][STRUCTURED STREAMING] Fixed temp file leak in state store

Repository: spark
Updated Branches:
  refs/heads/master 9d07ceee7 -> bdfe60ac9


[SPARK-18416][STRUCTURED STREAMING] Fixed temp file leak in state store

## What changes were proposed in this pull request?

StateStore.get() causes temporary files to be created immediately, even if the store is not used to make updates for new version. The temp file is not closed as store.commit() is not called in those cases, thus keeping the output stream to temp file open forever.

This PR fixes it by opening the temp file only when there are updates being made.

## How was this patch tested?

New unit test

Author: Tathagata Das <ta...@gmail.com>

Closes #15859 from tdas/SPARK-18416.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdfe60ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdfe60ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdfe60ac

Branch: refs/heads/master
Commit: bdfe60ac921172be0fb77de2f075cc7904a3b238
Parents: 9d07cee
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Nov 14 10:03:01 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Nov 14 10:03:01 2016 -0800

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 10 ++--
 .../streaming/state/StateStoreSuite.scala       | 63 ++++++++++++++++++++
 2 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bdfe60ac/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 8087131..f07feaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -87,8 +87,7 @@ private[state] class HDFSBackedStateStoreProvider(
 
     private val newVersion = version + 1
     private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}")
-    private val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true))
-
+    private lazy val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true))
     private val allUpdates = new java.util.HashMap[UnsafeRow, StoreUpdate]()
 
     @volatile private var state: STATE = UPDATING
@@ -101,7 +100,7 @@ private[state] class HDFSBackedStateStoreProvider(
     }
 
     override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
-      verify(state == UPDATING, "Cannot remove after already committed or aborted")
+      verify(state == UPDATING, "Cannot put after already committed or aborted")
 
       val isNewKey = !mapToUpdate.containsKey(key)
       mapToUpdate.put(key, value)
@@ -125,6 +124,7 @@ private[state] class HDFSBackedStateStoreProvider(
     /** Remove keys that match the following condition */
     override def remove(condition: UnsafeRow => Boolean): Unit = {
       verify(state == UPDATING, "Cannot remove after already committed or aborted")
+
       val keyIter = mapToUpdate.keySet().iterator()
       while (keyIter.hasNext) {
         val key = keyIter.next
@@ -154,7 +154,7 @@ private[state] class HDFSBackedStateStoreProvider(
         finalizeDeltaFile(tempDeltaFileStream)
         finalDeltaFile = commitUpdates(newVersion, mapToUpdate, tempDeltaFile)
         state = COMMITTED
-        logInfo(s"Committed version $newVersion for $this")
+        logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile")
         newVersion
       } catch {
         case NonFatal(e) =>
@@ -174,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider(
       if (tempDeltaFile != null) {
         fs.delete(tempDeltaFile, true)
       }
-      logInfo("Aborted")
+      logInfo(s"Aborted version $newVersion for $this")
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bdfe60ac/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 504a265..533cd0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -468,6 +468,69 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     assert(e.getCause.getMessage.contains("Failed to rename"))
   }
 
+  test("SPARK-18416: do not create temp delta file until the store is updated") {
+    val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
+    val storeId = StateStoreId(dir, 0, 0)
+    val storeConf = StateStoreConf.empty
+    val hadoopConf = new Configuration()
+    val deltaFileDir = new File(s"$dir/0/0/")
+
+    def numTempFiles: Int = {
+      if (deltaFileDir.exists) {
+        deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && !n.startsWith("."))
+      } else 0
+    }
+
+    def numDeltaFiles: Int = {
+      if (deltaFileDir.exists) {
+        deltaFileDir.listFiles.map(_.getName).count(n => n.contains(".delta") && !n.startsWith("."))
+      } else 0
+    }
+
+    def shouldNotCreateTempFile[T](body: => T): T = {
+      val before = numTempFiles
+      val result = body
+      assert(numTempFiles === before)
+      result
+    }
+
+    // Getting the store should not create temp file
+    val store0 = shouldNotCreateTempFile {
+      StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf)
+    }
+
+    // Put should create a temp file
+    put(store0, "a", 1)
+    assert(numTempFiles === 1)
+    assert(numDeltaFiles === 0)
+
+    // Commit should remove temp file and create a delta file
+    store0.commit()
+    assert(numTempFiles === 0)
+    assert(numDeltaFiles === 1)
+
+    // Remove should create a temp file
+    val store1 = shouldNotCreateTempFile {
+      StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf)
+    }
+    remove(store1, _ == "a")
+    assert(numTempFiles === 1)
+    assert(numDeltaFiles === 1)
+
+    // Commit should remove temp file and create a delta file
+    store1.commit()
+    assert(numTempFiles === 0)
+    assert(numDeltaFiles === 2)
+
+    // Commit without any updates should create a delta file
+    val store2 = shouldNotCreateTempFile {
+      StateStore.get(storeId, keySchema, valueSchema, 2, storeConf, hadoopConf)
+    }
+    store2.commit()
+    assert(numTempFiles === 0)
+    assert(numDeltaFiles === 3)
+  }
+
   def getDataFromFiles(
       provider: HDFSBackedStateStoreProvider,
     version: Int = -1): Set[(String, Int)] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org