You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/22 03:14:39 UTC

git commit: Merge pull request #196 from pwendell/master

Updated Branches:
  refs/heads/branch-0.8 f678e1005 -> d7c6a00cb


Merge pull request #196 from pwendell/master

TimeTrackingOutputStream should pass on calls to close() and flush().

Without this fix you get a huge number of open files when running shuffles.

(cherry picked from commit f20093c3afa68439b1c9010de189d497df787c2a)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d7c6a00c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d7c6a00c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d7c6a00c

Branch: refs/heads/branch-0.8
Commit: d7c6a00cb6f9d80439c998f7b8673fa58848e61e
Parents: f678e10
Author: Reynold Xin <rx...@apache.org>
Authored: Fri Nov 22 10:12:13 2013 +0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Nov 22 10:13:37 2013 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/storage/BlockObjectWriter.scala    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d7c6a00c/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 469e68f..b4451fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -93,6 +93,8 @@ class DiskBlockObjectWriter(
     def write(i: Int): Unit = callWithTiming(out.write(i))
     override def write(b: Array[Byte]) = callWithTiming(out.write(b))
     override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
+    override def close() = out.close()
+    override def flush() = out.flush()
   }
 
   private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean