You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/19 19:38:02 UTC

spark git commit: [SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file

Repository: spark
Updated Branches:
  refs/heads/master 38e624a73 -> 90095bf3c


[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data.

Author: zsxwing <zs...@gmail.com>

Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90095bf3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90095bf3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90095bf3

Branch: refs/heads/master
Commit: 90095bf3ce9304d09a32ceffaa99069079071b59
Parents: 38e624a
Author: zsxwing <zs...@gmail.com>
Authored: Thu Feb 19 18:37:31 2015 +0000
Committer: Ubuntu <ub...@ip-172-31-36-14.us-west-2.compute.internal>
Committed: Thu Feb 19 18:37:31 2015 +0000

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 52 ++++++++++++++++----
 1 file changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90095bf3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a6..fc7e86e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
     private var batchIndex = 0  // Which batch we're in
     private var fileStream: FileInputStream = null
 
+    @volatile private var closed = false
+
+    // A volatile variable to remember which DeserializationStream is using. Need to set it when we
+    // open a DeserializationStream. But we should use `deserializeStream` rather than
+    // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
+    // reduce the performance. It must be volatile so that we can see its correct value in the
+    // `finalize` method, which could run in any thread.
+    @volatile private var deserializeStreamToBeClosed: DeserializationStream = null
+
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher level streams
     private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
       // we're still in a valid batch.
       if (batchIndex < batchOffsets.length - 1) {
         if (deserializeStream != null) {
+          deserializeStreamToBeClosed = null
           deserializeStream.close()
           fileStream.close()
           deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
         val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
         val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
-        ser.deserializeStream(compressedStream)
+        // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
+        // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
+        // during reading the (K, C) pairs.
+        deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+        deserializeStreamToBeClosed
       } else {
         // No more batches left
         cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
       item
     }
 
-    // TODO: Ensure this gets called even if the iterator isn't drained.
-    private def cleanup() {
-      batchIndex = batchOffsets.length  // Prevent reading any other batch
-      val ds = deserializeStream
-      deserializeStream = null
-      fileStream = null
-      ds.close()
-      file.delete()
+    // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
+    // future, we need some mechanism to ensure this gets called once the resources are not used.
+    private def cleanup(): Unit = {
+      if (!closed) {
+        closed = true
+        batchIndex = batchOffsets.length  // Prevent reading any other batch
+        fileStream = null
+        try {
+          val ds = deserializeStreamToBeClosed
+          deserializeStreamToBeClosed = null
+          deserializeStream = null
+          if (ds != null) {
+            ds.close()
+          }
+        } finally {
+          if (file.exists()) {
+            file.delete()
+          }
+        }
+      }
+    }
+
+    override def finalize(): Unit = {
+      try {
+        cleanup()
+      } finally {
+        super.finalize()
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org