You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/05 19:43:15 UTC

[2/3] git commit: Never store shuffle blocks in BlockManager

Never store shuffle blocks in BlockManager

After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7eaa4617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7eaa4617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7eaa4617

Branch: refs/heads/branch-0.8
Commit: 7eaa461767105485228564c535967296f1991b11
Parents: 07b3f01
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Nov 1 19:25:23 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Nov 4 23:44:03 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockInfo.scala      | 18 +-----------------
 .../org/apache/spark/storage/BlockManager.scala   | 10 ++--------
 .../apache/spark/storage/BlockObjectWriter.scala  | 12 +-----------
 3 files changed, 4 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7eaa4617/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
index dbe0bda..c8f3976 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
 
 import java.util.concurrent.ConcurrentHashMap
 
-private[storage] trait BlockInfo {
-  def level: StorageLevel
-  def tellMaster: Boolean
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
   // To save space, 'pending' and 'failed' are encoded as special sizes:
   @volatile var size: Long = BlockInfo.BLOCK_PENDING
   private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
@@ -81,17 +79,3 @@ private object BlockInfo {
   private val BLOCK_PENDING: Long = -1L
   private val BLOCK_FAILED: Long = -2L
 }
-
-// All shuffle blocks have the same `level` and `tellMaster` properties,
-// so we can save space by not storing them in each instance:
-private[storage] class ShuffleBlockInfo extends BlockInfo {
-  // These need to be defined using 'def' instead of 'val' in order for
-  // the compiler to eliminate the fields:
-  def level: StorageLevel = StorageLevel.DISK_ONLY
-  def tellMaster: Boolean = false
-}
-
-private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
-  extends BlockInfo {
-  // Intentionally left blank
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7eaa4617/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index fbedfbc..a34c95b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -465,13 +465,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
-    writer.registerCloseEventHandler(() => {
-      val myInfo = new ShuffleBlockInfo()
-      blockInfo.put(blockId, myInfo)
-      myInfo.markReady(writer.fileSegment().length)
-    })
-    writer
+    new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
   }
 
   /**
@@ -501,7 +495,7 @@ private[spark] class BlockManager(
     // to be dropped right after it got put into memory. Note, however, that other threads will
     // not be able to get() this block until we call markReady on its BlockInfo.
     val myInfo = {
-      val tinfo = new BlockInfoImpl(level, tellMaster)
+      val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7eaa4617/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index e49c191..469e68f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
  */
 abstract class BlockObjectWriter(val blockId: BlockId) {
 
-  var closeEventHandler: () => Unit = _
-
   def open(): BlockObjectWriter
 
-  def close() {
-    closeEventHandler()
-  }
+  def close()
 
   def isOpen: Boolean
 
-  def registerCloseEventHandler(handler: () => Unit) {
-    closeEventHandler = handler
-  }
-
   /**
    * Flush the partial writes and commit them as a single atomic block. Return the
    * number of bytes written for this commit.
@@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
       ts = null
       objOut = null
     }
-    // Invoke the close callback handler.
-    super.close()
   }
 
   override def isOpen: Boolean = objOut != null