You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/13 13:18:11 UTC

spark git commit: [SPARK-6762]Fix potential resource leaks in CheckPoint CheckpointWriter and CheckpointReader

Repository: spark
Updated Branches:
  refs/heads/master 950645d59 -> cadd7d72c


[SPARK-6762]Fix potential resource leaks in CheckPoint CheckpointWriter and CheckpointReader

The close action should be placed within finally block to avoid the potential resource leaks

Author: lisurprise <zh...@intel.com>

Closes #5407 from zhichao-li/master and squashes the following commits:

065999f [lisurprise] add guard for null
ef862d6 [lisurprise] remove fs.close
a754adc [lisurprise] refactor with tryWithSafeFinally
824adb3 [lisurprise] close before validation
c877da7 [lisurprise] Fix potential resource leaks


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cadd7d72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cadd7d72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cadd7d72

Branch: refs/heads/master
Commit: cadd7d72c52ccc8d2def405a77dcf807fb5c17c2
Parents: 950645d
Author: lisurprise <zh...@intel.com>
Authored: Mon Apr 13 12:18:05 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Apr 13 12:18:05 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala | 47 ++++++++++++--------
 .../spark/streaming/util/RawTextSender.scala    |  3 +-
 2 files changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cadd7d72/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 28703ef..0a50485 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkException, SparkConf, Logging}
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.util.{MetadataCleaner, Utils}
 import org.apache.spark.streaming.scheduler.JobGenerator
 
 
@@ -139,8 +139,11 @@ class CheckpointWriter(
           // Write checkpoint to temp file
           fs.delete(tempFile, true)   // just in case it exists
           val fos = fs.create(tempFile)
-          fos.write(bytes)
-          fos.close()
+          Utils.tryWithSafeFinally {
+            fos.write(bytes)
+          } {
+            fos.close()
+          }
 
           // If the checkpoint file exists, back it up
           // If the backup exists as well, just delete it, otherwise rename will fail
@@ -187,9 +190,11 @@ class CheckpointWriter(
     val bos = new ByteArrayOutputStream()
     val zos = compressionCodec.compressedOutputStream(bos)
     val oos = new ObjectOutputStream(zos)
-    oos.writeObject(checkpoint)
-    oos.close()
-    bos.close()
+    Utils.tryWithSafeFinally {
+      oos.writeObject(checkpoint)
+    } {
+      oos.close()
+    }
     try {
       executor.execute(new CheckpointWriteHandler(
         checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
@@ -248,18 +253,24 @@ object CheckpointReader extends Logging {
     checkpointFiles.foreach(file => {
       logInfo("Attempting to load checkpoint from file " + file)
       try {
-        val fis = fs.open(file)
-        // ObjectInputStream uses the last defined user-defined class loader in the stack
-        // to find classes, which maybe the wrong class loader. Hence, a inherited version
-        // of ObjectInputStream is used to explicitly use the current thread's default class
-        // loader to find and load classes. This is a well know Java issue and has popped up
-        // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
-        val zis = compressionCodec.compressedInputStream(fis)
-        val ois = new ObjectInputStreamWithLoader(zis,
-          Thread.currentThread().getContextClassLoader)
-        val cp = ois.readObject.asInstanceOf[Checkpoint]
-        ois.close()
-        fs.close()
+        var ois: ObjectInputStreamWithLoader = null
+        var cp: Checkpoint = null
+        Utils.tryWithSafeFinally {
+          val fis = fs.open(file)
+          // ObjectInputStream uses the last defined user-defined class loader in the stack
+          // to find classes, which maybe the wrong class loader. Hence, a inherited version
+          // of ObjectInputStream is used to explicitly use the current thread's default class
+          // loader to find and load classes. This is a well know Java issue and has popped up
+          // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
+          val zis = compressionCodec.compressedInputStream(fis)
+          ois = new ObjectInputStreamWithLoader(zis,
+            Thread.currentThread().getContextClassLoader)
+          cp = ois.readObject.asInstanceOf[Checkpoint]
+        } {
+          if (ois != null) {
+            ois.close()
+          }
+        }
         cp.validate()
         logInfo("Checkpoint successfully loaded from file " + file)
         logInfo("Checkpoint was generated at time " + cp.checkpointTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/cadd7d72/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index a785081..ca2f319 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -72,7 +72,8 @@ object RawTextSender extends Logging {
       } catch {
         case e: IOException =>
           logError("Client disconnected")
-          socket.close()
+      } finally {
+        socket.close()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org