You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/13 13:18:11 UTC
spark git commit: [SPARK-6762]Fix potential resource leaks in
CheckPoint CheckpointWriter and CheckpointReader
Repository: spark
Updated Branches:
refs/heads/master 950645d59 -> cadd7d72c
[SPARK-6762]Fix potential resource leaks in CheckPoint CheckpointWriter and CheckpointReader
The close action should be placed within finally block to avoid the potential resource leaks
Author: lisurprise <zh...@intel.com>
Closes #5407 from zhichao-li/master and squashes the following commits:
065999f [lisurprise] add guard for null
ef862d6 [lisurprise] remove fs.close
a754adc [lisurprise] refactor with tryWithSafeFinally
824adb3 [lisurprise] close before validation
c877da7 [lisurprise] Fix potential resource leaks
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cadd7d72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cadd7d72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cadd7d72
Branch: refs/heads/master
Commit: cadd7d72c52ccc8d2def405a77dcf807fb5c17c2
Parents: 950645d
Author: lisurprise <zh...@intel.com>
Authored: Mon Apr 13 12:18:05 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Apr 13 12:18:05 2015 +0100
----------------------------------------------------------------------
.../org/apache/spark/streaming/Checkpoint.scala | 47 ++++++++++++--------
.../spark/streaming/util/RawTextSender.scala | 3 +-
2 files changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cadd7d72/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 28703ef..0a50485 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.util.{MetadataCleaner, Utils}
import org.apache.spark.streaming.scheduler.JobGenerator
@@ -139,8 +139,11 @@ class CheckpointWriter(
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
- fos.write(bytes)
- fos.close()
+ Utils.tryWithSafeFinally {
+ fos.write(bytes)
+ } {
+ fos.close()
+ }
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
@@ -187,9 +190,11 @@ class CheckpointWriter(
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
- oos.writeObject(checkpoint)
- oos.close()
- bos.close()
+ Utils.tryWithSafeFinally {
+ oos.writeObject(checkpoint)
+ } {
+ oos.close()
+ }
try {
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
@@ -248,18 +253,24 @@ object CheckpointReader extends Logging {
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
- val fis = fs.open(file)
- // ObjectInputStream uses the last defined user-defined class loader in the stack
- // to find classes, which maybe the wrong class loader. Hence, a inherited version
- // of ObjectInputStream is used to explicitly use the current thread's default class
- // loader to find and load classes. This is a well know Java issue and has popped up
- // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val zis = compressionCodec.compressedInputStream(fis)
- val ois = new ObjectInputStreamWithLoader(zis,
- Thread.currentThread().getContextClassLoader)
- val cp = ois.readObject.asInstanceOf[Checkpoint]
- ois.close()
- fs.close()
+ var ois: ObjectInputStreamWithLoader = null
+ var cp: Checkpoint = null
+ Utils.tryWithSafeFinally {
+ val fis = fs.open(file)
+ // ObjectInputStream uses the last defined user-defined class loader in the stack
+ // to find classes, which maybe the wrong class loader. Hence, a inherited version
+ // of ObjectInputStream is used to explicitly use the current thread's default class
+ // loader to find and load classes. This is a well know Java issue and has popped up
+ // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
+ val zis = compressionCodec.compressedInputStream(fis)
+ ois = new ObjectInputStreamWithLoader(zis,
+ Thread.currentThread().getContextClassLoader)
+ cp = ois.readObject.asInstanceOf[Checkpoint]
+ } {
+ if (ois != null) {
+ ois.close()
+ }
+ }
cp.validate()
logInfo("Checkpoint successfully loaded from file " + file)
logInfo("Checkpoint was generated at time " + cp.checkpointTime)
http://git-wip-us.apache.org/repos/asf/spark/blob/cadd7d72/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index a785081..ca2f319 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -72,7 +72,8 @@ object RawTextSender extends Logging {
} catch {
case e: IOException =>
logError("Client disconnected")
- socket.close()
+ } finally {
+ socket.close()
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org