You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/05 19:43:14 UTC

[1/3] git commit: Merge pull request #75 from JoshRosen/block-manager-cleanup

Updated Branches:
  refs/heads/branch-0.8 08481679c -> 96670e716


Merge pull request #75 from JoshRosen/block-manager-cleanup

Code de-duplication in BlockManager

The BlockManager has a few methods that duplicate most of their code.  This pull request extracts the duplicated code into private doPut(), doGetLocal(), and doGetRemote() methods that unify the storing/reading of bytes or objects.

I believe that I preserved the logic of the original code, but I'd appreciate some help in reviewing this.
(cherry picked from commit edc5e3f8f44a658e9829f2ee65d5fb32b464121b)

Signed-off-by: Aaron Davidson <aa...@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/07b3f01f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/07b3f01f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/07b3f01f

Branch: refs/heads/branch-0.8
Commit: 07b3f01f5bf4e2f81ac7abc4906118cf792434e1
Parents: 7e00dee
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sun Oct 20 17:18:06 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Nov 4 23:32:56 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 458 +++++++------------
 1 file changed, 166 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/07b3f01f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ccc05f5..fbedfbc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import java.io.{File, InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
+import scala.collection.mutable.{HashMap, ArrayBuffer}
 import scala.util.Random
 
 import akka.actor.{ActorSystem, Cancellable, Props}
@@ -267,89 +267,14 @@ private[spark] class BlockManager(
    */
   def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
     logDebug("Getting local block " + blockId)
-    val info = blockInfo.get(blockId).orNull
-    if (info != null) {
-      info.synchronized {
-
-        // In the another thread is writing the block, wait for it to become ready.
-        if (!info.waitForReady()) {
-          // If we get here, the block write failed.
-          logWarning("Block " + blockId + " was marked as failure.")
-          return None
-        }
-
-        val level = info.level
-        logDebug("Level for block " + blockId + " is " + level)
-
-        // Look for the block in memory
-        if (level.useMemory) {
-          logDebug("Getting block " + blockId + " from memory")
-          memoryStore.getValues(blockId) match {
-            case Some(iterator) =>
-              return Some(iterator)
-            case None =>
-              logDebug("Block " + blockId + " not found in memory")
-          }
-        }
-
-        // Look for block on disk, potentially loading it back into memory if required
-        if (level.useDisk) {
-          logDebug("Getting block " + blockId + " from disk")
-          if (level.useMemory && level.deserialized) {
-            diskStore.getValues(blockId) match {
-              case Some(iterator) =>
-                // Put the block back in memory before returning it
-                // TODO: Consider creating a putValues that also takes in a iterator ?
-                val elements = new ArrayBuffer[Any]
-                elements ++= iterator
-                memoryStore.putValues(blockId, elements, level, true).data match {
-                  case Left(iterator2) =>
-                    return Some(iterator2)
-                  case _ =>
-                    throw new Exception("Memory store did not return back an iterator")
-                }
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, though it should be")
-            }
-          } else if (level.useMemory && !level.deserialized) {
-            // Read it as a byte buffer into memory first, then return it
-            diskStore.getBytes(blockId) match {
-              case Some(bytes) =>
-                // Put a copy of the block back in memory before returning it. Note that we can't
-                // put the ByteBuffer returned by the disk store as that's a memory-mapped file.
-                // The use of rewind assumes this.
-                assert (0 == bytes.position())
-                val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                copyForMemory.put(bytes)
-                memoryStore.putBytes(blockId, copyForMemory, level)
-                bytes.rewind()
-                return Some(dataDeserialize(blockId, bytes))
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, though it should be")
-            }
-          } else {
-            diskStore.getValues(blockId) match {
-              case Some(iterator) =>
-                return Some(iterator)
-              case None =>
-                throw new Exception("Block " + blockId + " not found on disk, though it should be")
-            }
-          }
-        }
-      }
-    } else {
-      logDebug("Block " + blockId + " not registered locally")
-    }
-    return None
+    doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
   }
 
   /**
    * Get block from the local block manager as serialized bytes.
    */
   def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
-    // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
     logDebug("Getting local block " + blockId + " as bytes")
-
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
     if (blockId.isShuffle) {
@@ -360,12 +285,15 @@ private[spark] class BlockManager(
           throw new Exception("Block " + blockId + " not found on disk, though it should be")
       }
     }
+    doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+  }
 
+  private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
     val info = blockInfo.get(blockId).orNull
     if (info != null) {
       info.synchronized {
 
-        // In the another thread is writing the block, wait for it to become ready.
+        // If another thread is writing the block, wait for it to become ready.
         if (!info.waitForReady()) {
           // If we get here, the block write failed.
           logWarning("Block " + blockId + " was marked as failure.")
@@ -378,62 +306,104 @@ private[spark] class BlockManager(
         // Look for the block in memory
         if (level.useMemory) {
           logDebug("Getting block " + blockId + " from memory")
-          memoryStore.getBytes(blockId) match {
-            case Some(bytes) =>
-              return Some(bytes)
+          val result = if (asValues) {
+            memoryStore.getValues(blockId)
+          } else {
+            memoryStore.getBytes(blockId)
+          }
+          result match {
+            case Some(values) =>
+              return Some(values)
             case None =>
               logDebug("Block " + blockId + " not found in memory")
           }
         }
 
-        // Look for block on disk
+        // Look for block on disk, potentially storing it back into memory if required:
         if (level.useDisk) {
-          // Read it as a byte buffer into memory first, then return it
-          diskStore.getBytes(blockId) match {
-            case Some(bytes) =>
-              assert (0 == bytes.position())
-              if (level.useMemory) {
-                if (level.deserialized) {
-                  memoryStore.putBytes(blockId, bytes, level)
-                } else {
-                  // The memory store will hang onto the ByteBuffer, so give it a copy instead of
-                  // the memory-mapped file buffer we got from the disk store
-                  val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                  copyForMemory.put(bytes)
-                  memoryStore.putBytes(blockId, copyForMemory, level)
-                }
-              }
-              bytes.rewind()
-              return Some(bytes)
+          logDebug("Getting block " + blockId + " from disk")
+          val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
+            case Some(bytes) => bytes
             case None =>
               throw new Exception("Block " + blockId + " not found on disk, though it should be")
           }
+          assert (0 == bytes.position())
+
+          if (!level.useMemory) {
+            // If the block shouldn't be stored in memory, we can just return it:
+            if (asValues) {
+              return Some(dataDeserialize(blockId, bytes))
+            } else {
+              return Some(bytes)
+            }
+          } else {
+            // Otherwise, we also have to store something in the memory store:
+            if (!level.deserialized || !asValues) {
+              // We'll store the bytes in memory if the block's storage level includes
+              // "memory serialized", or if it should be cached as objects in memory
+              // but we only requested its serialized bytes:
+              val copyForMemory = ByteBuffer.allocate(bytes.limit)
+              copyForMemory.put(bytes)
+              memoryStore.putBytes(blockId, copyForMemory, level)
+              bytes.rewind()
+            }
+            if (!asValues) {
+              return Some(bytes)
+            } else {
+              val values = dataDeserialize(blockId, bytes)
+              if (level.deserialized) {
+                // Cache the values before returning them:
+                // TODO: Consider creating a putValues that also takes in a iterator?
+                val valuesBuffer = new ArrayBuffer[Any]
+                valuesBuffer ++= values
+                memoryStore.putValues(blockId, valuesBuffer, level, true).data match {
+                  case Left(values2) =>
+                    return Some(values2)
+                  case _ =>
+                    throw new Exception("Memory store did not return back an iterator")
+                }
+              } else {
+                return Some(values)
+              }
+            }
+          }
         }
       }
     } else {
       logDebug("Block " + blockId + " not registered locally")
     }
-    return None
+    None
   }
 
   /**
    * Get block from remote block managers.
    */
   def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
     logDebug("Getting remote block " + blockId)
-    // Get locations of block
-    val locations = Random.shuffle(master.getLocations(blockId))
+    doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+  }
+
+  /**
+   * Get block from remote block managers as serialized bytes.
+   */
+   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+    logDebug("Getting remote block " + blockId + " as bytes")
+    doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+   }
 
-    // Get block from remote locations
+  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+    require(blockId != null, "BlockId is null")
+    val locations = Random.shuffle(master.getLocations(blockId))
     for (loc <- locations) {
       logDebug("Getting remote block " + blockId + " from " + loc)
       val data = BlockManagerWorker.syncGetBlock(
         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
       if (data != null) {
-        return Some(dataDeserialize(blockId, data))
+        if (asValues) {
+          return Some(dataDeserialize(blockId, data))
+        } else {
+          return Some(data)
+        }
       }
       logDebug("The value of block " + blockId + " is null")
     }
@@ -442,31 +412,6 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from remote block managers as serialized bytes.
-   */
-   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
-     // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be
-     // refactored.
-     if (blockId == null) {
-       throw new IllegalArgumentException("Block Id is null")
-     }
-     logDebug("Getting remote block " + blockId + " as bytes")
-     
-     val locations = master.getLocations(blockId)
-     for (loc <- locations) {
-       logDebug("Getting remote block " + blockId + " from " + loc)
-       val data = BlockManagerWorker.syncGetBlock(
-         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
-       if (data != null) {
-         return Some(data)
-       }
-       logDebug("The value of block " + blockId + " is null")
-     }
-     logDebug("Block " + blockId + " not found")
-     return None
-   }
-
-  /**
    * Get a block from the block manager (either local or remote).
    */
   def get(blockId: BlockId): Option[Iterator[Any]] = {
@@ -533,17 +478,24 @@ private[spark] class BlockManager(
    * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
    */
   def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
-    tellMaster: Boolean = true) : Long = {
+          tellMaster: Boolean = true) : Long = {
+    require(values != null, "Values is null")
+    doPut(blockId, Left(values), level, tellMaster)
+  }
 
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    if (values == null) {
-      throw new IllegalArgumentException("Values is null")
-    }
-    if (level == null || !level.isValid) {
-      throw new IllegalArgumentException("Storage level is null or invalid")
-    }
+  /**
+   * Put a new block of serialized bytes to the block manager.
+   */
+  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
+               tellMaster: Boolean = true) {
+    require(bytes != null, "Bytes is null")
+    doPut(blockId, Right(bytes), level, tellMaster)
+  }
+
+  private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer],
+                    level: StorageLevel, tellMaster: Boolean = true): Long = {
+    require(blockId != null, "BlockId is null")
+    require(level != null && level.isValid, "StorageLevel is null or invalid")
 
     // Remember the block's storage level so that we can correctly drop it to disk if it needs
     // to be dropped right after it got put into memory. Note, however, that other threads will
@@ -559,7 +511,8 @@ private[spark] class BlockManager(
           return oldBlockOpt.get.size
         }
 
-        // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
+        // TODO: So the block info exists - but previous attempt to load it (?) failed.
+        // What do we do now ? Retry on it ?
         oldBlockOpt.get
       } else {
         tinfo
@@ -568,10 +521,10 @@ private[spark] class BlockManager(
 
     val startTimeMs = System.currentTimeMillis
 
-    // If we need to replicate the data, we'll want access to the values, but because our
-    // put will read the whole iterator, there will be no values left. For the case where
-    // the put serializes data, we'll remember the bytes, above; but for the case where it
-    // doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
+    // If we're storing values and we need to replicate the data, we'll want access to the values,
+    // but because our put will read the whole iterator, there will be no values left. For the
+    // case where the put serializes data, we'll remember the bytes, above; but for the case where
+    // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
     var valuesAfterPut: Iterator[Any] = null
 
     // Ditto for the bytes after the put
@@ -580,30 +533,51 @@ private[spark] class BlockManager(
     // Size of the block in bytes (to return to caller)
     var size = 0L
 
+    // If we're storing bytes, then initiate the replication before storing them locally.
+    // This is faster as data is already serialized and ready to send.
+    val replicationFuture = if (data.isRight && level.replication > 1) {
+      val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
+      Future {
+        replicate(blockId, bufferView, level)
+      }
+    } else {
+      null
+    }
+
     myInfo.synchronized {
       logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
 
       var marked = false
       try {
-        if (level.useMemory) {
-          // Save it just to memory first, even if it also has useDisk set to true; we will later
-          // drop it to disk if the memory store can't hold it.
-          val res = memoryStore.putValues(blockId, values, level, true)
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case Left(newIterator) => valuesAfterPut = newIterator
+        data match {
+          case Left(values) => {
+            if (level.useMemory) {
+              // Save it just to memory first, even if it also has useDisk set to true; we will
+              // drop it to disk later if the memory store can't hold it.
+              val res = memoryStore.putValues(blockId, values, level, true)
+              size = res.size
+              res.data match {
+                case Right(newBytes) => bytesAfterPut = newBytes
+                case Left(newIterator) => valuesAfterPut = newIterator
+              }
+            } else {
+              // Save directly to disk.
+              // Don't get back the bytes unless we replicate them.
+              val askForBytes = level.replication > 1
+              val res = diskStore.putValues(blockId, values, level, askForBytes)
+              size = res.size
+              res.data match {
+                case Right(newBytes) => bytesAfterPut = newBytes
+                case _ =>
+              }
+            }
           }
-        } else {
-          // Save directly to disk.
-          // Don't get back the bytes unless we replicate them.
-          val askForBytes = level.replication > 1
-          val res = diskStore.putValues(blockId, values, level, askForBytes)
-          size = res.size
-          res.data match {
-            case Right(newBytes) => bytesAfterPut = newBytes
-            case _ =>
+          case Right(bytes) => {
+            bytes.rewind()
+            // Store it only in memory at first, even if useDisk is also set to true
+            (if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level)
+            size = bytes.limit
           }
         }
 
@@ -628,125 +602,39 @@ private[spark] class BlockManager(
     }
     logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
 
-    // Replicate block if required
+    // Either we're storing bytes and we asynchronously started replication, or we're storing
+    // values and need to serialize and replicate them now:
     if (level.replication > 1) {
-      val remoteStartTime = System.currentTimeMillis
-      // Serialize the block if not already done
-      if (bytesAfterPut == null) {
-        if (valuesAfterPut == null) {
-          throw new SparkException(
-            "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
-        }
-        bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
-      }
-      replicate(blockId, bytesAfterPut, level)
-      logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime))
-    }
-    BlockManager.dispose(bytesAfterPut)
-
-    return size
-  }
-
-
-  /**
-   * Put a new block of serialized bytes to the block manager.
-   */
-  def putBytes(
-    blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
-
-    if (blockId == null) {
-      throw new IllegalArgumentException("Block Id is null")
-    }
-    if (bytes == null) {
-      throw new IllegalArgumentException("Bytes is null")
-    }
-    if (level == null || !level.isValid) {
-      throw new IllegalArgumentException("Storage level is null or invalid")
-    }
-
-    // Remember the block's storage level so that we can correctly drop it to disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that other threads will
-    // not be able to get() this block until we call markReady on its BlockInfo.
-    val myInfo = {
-      val tinfo = new BlockInfoImpl(level, tellMaster)
-      // Do atomically !
-      val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
-
-      if (oldBlockOpt.isDefined) {
-        if (oldBlockOpt.get.waitForReady()) {
-          logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
-          return
-        }
-
-        // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
-        oldBlockOpt.get
-      } else {
-        tinfo
-      }
-    }
-
-    val startTimeMs = System.currentTimeMillis
-
-    // Initiate the replication before storing it locally. This is faster as
-    // data is already serialized and ready for sending
-    val replicationFuture = if (level.replication > 1) {
-      val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
-      Future {
-        replicate(blockId, bufferView, level)
-      }
-    } else {
-      null
-    }
-
-    myInfo.synchronized {
-      logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
-        + " to get into synchronized block")
-
-      var marked = false
-      try {
-        if (level.useMemory) {
-          // Store it only in memory at first, even if useDisk is also set to true
-          bytes.rewind()
-          memoryStore.putBytes(blockId, bytes, level)
-        } else {
-          bytes.rewind()
-          diskStore.putBytes(blockId, bytes, level)
-        }
-
-        // assert (0 == bytes.position(), "" + bytes)
-
-        // Now that the block is in either the memory or disk store, let other threads read it,
-        // and tell the master about it.
-        marked = true
-        myInfo.markReady(bytes.limit)
-        if (tellMaster) {
-          reportBlockStatus(blockId, myInfo)
-        }
-      } finally {
-        // If we failed at putting the block to memory/disk, notify other possible readers
-        // that it has failed, and then remove it from the block info map.
-        if (! marked) {
-          // Note that the remove must happen before markFailure otherwise another thread
-          // could've inserted a new BlockInfo before we remove it.
-          blockInfo.remove(blockId)
-          myInfo.markFailure()
-          logWarning("Putting block " + blockId + " failed")
+      data match {
+        case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
+        case Left(values) => {
+          val remoteStartTime = System.currentTimeMillis
+          // Serialize the block if not already done
+          if (bytesAfterPut == null) {
+            if (valuesAfterPut == null) {
+              throw new SparkException(
+                "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
+            }
+            bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
+          }
+          replicate(blockId, bytesAfterPut, level)
+          logDebug("Put block " + blockId + " remotely took " +
+            Utils.getUsedTimeMs(remoteStartTime))
         }
       }
     }
 
-    // If replication had started, then wait for it to finish
-    if (level.replication > 1) {
-      Await.ready(replicationFuture, Duration.Inf)
-    }
+    BlockManager.dispose(bytesAfterPut)
 
     if (level.replication > 1) {
-      logDebug("PutBytes for block " + blockId + " with replication took " +
+      logDebug("Put for block " + blockId + " with replication took " +
         Utils.getUsedTimeMs(startTimeMs))
     } else {
-      logDebug("PutBytes for block " + blockId + " without replication took " +
+      logDebug("Put for block " + blockId + " without replication took " +
         Utils.getUsedTimeMs(startTimeMs))
     }
+
+    size
   }
 
   /**
@@ -871,34 +759,20 @@ private[spark] class BlockManager(
 
   private def dropOldNonBroadcastBlocks(cleanupTime: Long) {
     logInfo("Dropping non broadcast blocks older than " + cleanupTime)
-    val iterator = blockInfo.internalMap.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
-      if (time < cleanupTime && !id.isBroadcast) {
-        info.synchronized {
-          val level = info.level
-          if (level.useMemory) {
-            memoryStore.remove(id)
-          }
-          if (level.useDisk) {
-            diskStore.remove(id)
-          }
-          iterator.remove()
-          logInfo("Dropped block " + id)
-        }
-        reportBlockStatus(id, info)
-      }
-    }
+    dropOldBlocks(cleanupTime, !_.isBroadcast)
   }
 
   private def dropOldBroadcastBlocks(cleanupTime: Long) {
     logInfo("Dropping broadcast blocks older than " + cleanupTime)
+    dropOldBlocks(cleanupTime, _.isBroadcast)
+  }
+
+  private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) {
     val iterator = blockInfo.internalMap.entrySet().iterator()
     while (iterator.hasNext) {
       val entry = iterator.next()
       val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
-      if (time < cleanupTime && id.isBroadcast) {
+      if (time < cleanupTime && shouldDrop(id)) {
         info.synchronized {
           val level = info.level
           if (level.useMemory) {


[2/3] git commit: Never store shuffle blocks in BlockManager

Posted by rx...@apache.org.
Never store shuffle blocks in BlockManager

After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7eaa4617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7eaa4617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7eaa4617

Branch: refs/heads/branch-0.8
Commit: 7eaa461767105485228564c535967296f1991b11
Parents: 07b3f01
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Nov 1 19:25:23 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Nov 4 23:44:03 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockInfo.scala      | 18 +-----------------
 .../org/apache/spark/storage/BlockManager.scala   | 10 ++--------
 .../apache/spark/storage/BlockObjectWriter.scala  | 12 +-----------
 3 files changed, 4 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7eaa4617/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
index dbe0bda..c8f3976 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
 
 import java.util.concurrent.ConcurrentHashMap
 
-private[storage] trait BlockInfo {
-  def level: StorageLevel
-  def tellMaster: Boolean
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
   // To save space, 'pending' and 'failed' are encoded as special sizes:
   @volatile var size: Long = BlockInfo.BLOCK_PENDING
   private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
@@ -81,17 +79,3 @@ private object BlockInfo {
   private val BLOCK_PENDING: Long = -1L
   private val BLOCK_FAILED: Long = -2L
 }
-
-// All shuffle blocks have the same `level` and `tellMaster` properties,
-// so we can save space by not storing them in each instance:
-private[storage] class ShuffleBlockInfo extends BlockInfo {
-  // These need to be defined using 'def' instead of 'val' in order for
-  // the compiler to eliminate the fields:
-  def level: StorageLevel = StorageLevel.DISK_ONLY
-  def tellMaster: Boolean = false
-}
-
-private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
-  extends BlockInfo {
-  // Intentionally left blank
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7eaa4617/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index fbedfbc..a34c95b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -465,13 +465,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
-    writer.registerCloseEventHandler(() => {
-      val myInfo = new ShuffleBlockInfo()
-      blockInfo.put(blockId, myInfo)
-      myInfo.markReady(writer.fileSegment().length)
-    })
-    writer
+    new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
   }
 
   /**
@@ -501,7 +495,7 @@ private[spark] class BlockManager(
     // to be dropped right after it got put into memory. Note, however, that other threads will
     // not be able to get() this block until we call markReady on its BlockInfo.
     val myInfo = {
-      val tinfo = new BlockInfoImpl(level, tellMaster)
+      val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7eaa4617/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index e49c191..469e68f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
  */
 abstract class BlockObjectWriter(val blockId: BlockId) {
 
-  var closeEventHandler: () => Unit = _
-
   def open(): BlockObjectWriter
 
-  def close() {
-    closeEventHandler()
-  }
+  def close()
 
   def isOpen: Boolean
 
-  def registerCloseEventHandler(handler: () => Unit) {
-    closeEventHandler = handler
-  }
-
   /**
    * Flush the partial writes and commit them as a single atomic block. Return the
    * number of bytes written for this commit.
@@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
       ts = null
       objOut = null
     }
-    // Invoke the close callback handler.
-    super.close()
   }
 
   override def isOpen: Boolean = objOut != null


[3/3] git commit: Merge pull request #140 from aarondav/merge-75

Posted by rx...@apache.org.
Merge pull request #140 from aarondav/merge-75

Manually merge BlockManager refactor #75

PR #75 was unfortunately not cherry-picked into 0.8.0, which caused a build breakage earlier and now again due to later changes being cherry-picked. This merges #75 and replays #139 on top of it. I have confirmed that #75 has made exactly the changes it did on master. So long as all BlockManager patches from master should have gone into branch-0.8, this PR should do the right thing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/96670e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/96670e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/96670e71

Branch: refs/heads/branch-0.8
Commit: 96670e71679dedc2857d430a018f5b9505946bdd
Parents: 0848167 7eaa461
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Nov 5 10:43:10 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Nov 5 10:43:10 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 458 +++++++------------
 1 file changed, 166 insertions(+), 292 deletions(-)
----------------------------------------------------------------------