You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/16 21:41:50 UTC

git commit: [SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor

Repository: spark
Updated Branches:
  refs/heads/master 84073eb11 -> a9e910430


[SPARK-3546] InputStream of ManagedBuffer is not closed and causes running out of file descriptor

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #2408 from sarutak/resolve-resource-leak-issue and squashes the following commits:

074781d [Kousuke Saruta] Modified SuffleBlockFetcherIterator
5f63f67 [Kousuke Saruta] Move metrics increment logic and debug logging outside try block
b37231a [Kousuke Saruta] Modified FileSegmentManagedBuffer#nioByteBuffer to check null or not before invoking channel.close
bf29d4a [Kousuke Saruta] Modified FileSegment to close channel


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9e91043
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9e91043
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9e91043

Branch: refs/heads/master
Commit: a9e910430fb6bb4ef1f6ae20761c43b96bb018df
Parents: 84073eb
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Tue Sep 16 12:41:45 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Sep 16 12:41:45 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/network/ManagedBuffer.scala  | 12 ++++++++++--
 .../spark/storage/ShuffleBlockFetcherIterator.scala     |  2 +-
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9e91043/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
index dcecb6b..e990c1d 100644
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.network
 
 import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
 import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
 import java.nio.channels.FileChannel.MapMode
 
 import com.google.common.io.ByteStreams
@@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
   override def size: Long = length
 
   override def nioByteBuffer(): ByteBuffer = {
-    val channel = new RandomAccessFile(file, "r").getChannel
-    channel.map(MapMode.READ_ONLY, offset, length)
+    var channel: FileChannel = null
+    try {
+      channel = new RandomAccessFile(file, "r").getChannel
+      channel.map(MapMode.READ_ONLY, offset, length)
+    } finally {
+      if (channel != null) {
+        channel.close()
+      }
+    }
   }
 
   override def inputStream(): InputStream = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a9e91043/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index c8e708a..d868758 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashSet
 import scala.collection.mutable.Queue
 
-import org.apache.spark.{TaskContext, Logging, SparkException}
+import org.apache.spark.{TaskContext, Logging}
 import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org