You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/19 19:38:02 UTC
spark git commit: [SPARK-5423][Core] Cleanup resources in
DiskMapIterator.finalize to ensure deleting the temp file
Repository: spark
Updated Branches:
refs/heads/master 38e624a73 -> 90095bf3c
[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data.
Author: zsxwing <zs...@gmail.com>
Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:
d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90095bf3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90095bf3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90095bf3
Branch: refs/heads/master
Commit: 90095bf3ce9304d09a32ceffaa99069079071b59
Parents: 38e624a
Author: zsxwing <zs...@gmail.com>
Authored: Thu Feb 19 18:37:31 2015 +0000
Committer: Ubuntu <ub...@ip-172-31-36-14.us-west-2.compute.internal>
Committed: Thu Feb 19 18:37:31 2015 +0000
----------------------------------------------------------------------
.../util/collection/ExternalAppendOnlyMap.scala | 52 ++++++++++++++++----
1 file changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/90095bf3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a6..fc7e86e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
private var batchIndex = 0 // Which batch we're in
private var fileStream: FileInputStream = null
+ @volatile private var closed = false
+
+ // A volatile variable to remember which DeserializationStream is using. Need to set it when we
+ // open a DeserializationStream. But we should use `deserializeStream` rather than
+ // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
+ // reduce the performance. It must be volatile so that we can see its correct value in the
+ // `finalize` method, which could run in any thread.
+ @volatile private var deserializeStreamToBeClosed: DeserializationStream = null
+
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
+ deserializeStreamToBeClosed = null
deserializeStream.close()
fileStream.close()
deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
- ser.deserializeStream(compressedStream)
+ // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
+ // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
+ // during reading the (K, C) pairs.
+ deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+ deserializeStreamToBeClosed
} else {
// No more batches left
cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
item
}
- // TODO: Ensure this gets called even if the iterator isn't drained.
- private def cleanup() {
- batchIndex = batchOffsets.length // Prevent reading any other batch
- val ds = deserializeStream
- deserializeStream = null
- fileStream = null
- ds.close()
- file.delete()
+ // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
+ // future, we need some mechanism to ensure this gets called once the resources are not used.
+ private def cleanup(): Unit = {
+ if (!closed) {
+ closed = true
+ batchIndex = batchOffsets.length // Prevent reading any other batch
+ fileStream = null
+ try {
+ val ds = deserializeStreamToBeClosed
+ deserializeStreamToBeClosed = null
+ deserializeStream = null
+ if (ds != null) {
+ ds.close()
+ }
+ } finally {
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+ }
+ }
+
+ override def finalize(): Unit = {
+ try {
+ cleanup()
+ } finally {
+ super.finalize()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org