You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/21 02:14:39 UTC
git commit: Clean up CacheManager et al.
Repository: spark
Updated Branches:
refs/heads/master 0ac71d128 -> 01125a116
Clean up CacheManager et al.
**UPDATE**
I have removed the special handling for `StorageLevel.MEMORY_*_SER` for now, because it introduces a potential performance regression. With the latest changes, this PR should include mainly style (code readability) fixes. The only functionality change is the update in `MemoryStore#putBytes` to actually return updated blocks, though this is a minor bug fix.
Now this is mainly a precursor to another PR (once again).
---------
*Old comment*
The deserialized version of a partition may occupy much more space than the serialized version. Therefore, if a partition is to be cached with `StorageLevel.MEMORY_*_SER`, we don't need to fully unroll it into an `ArrayBuffer`, but instead we can unroll it into a potentially much smaller `ByteBuffer`. This may save us from OOMs in this case.
Author: Andrew Or <an...@gmail.com>
Closes #1083 from andrewor14/unroll-them-partitions and squashes the following commits:
7048aa0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into unroll-them-partitions
3d9a366 [Andrew Or] Minor change for readability
d12b95f [Andrew Or] Remove unused imports (minor)
a4c387b [Andrew Or] Merge branch 'master' of github.com:apache/spark into unroll-them-partitions
cf5f565 [Andrew Or] Remove special handling for MEM_*_SER
0091ec0 [Andrew Or] Address review feedback
44ef282 [Andrew Or] Actually return updated blocks in putBytes
2941c89 [Andrew Or] Clean up BlockStore (minor)
a8f181d [Andrew Or] Add special handling for StorageLevel.MEMORY_*_SER
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01125a11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01125a11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01125a11
Branch: refs/heads/master
Commit: 01125a1162b7bf42ae907cb6616616cc4ffb6fa9
Parents: 0ac71d1
Author: Andrew Or <an...@gmail.com>
Authored: Fri Jun 20 17:14:33 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Jun 20 17:14:33 2014 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/CacheManager.scala | 163 +++++++++++--------
.../org/apache/spark/scheduler/ResultTask.scala | 3 +-
.../apache/spark/scheduler/ShuffleMapTask.scala | 5 +-
.../org/apache/spark/storage/BlockStore.scala | 22 ++-
.../org/apache/spark/storage/MemoryStore.scala | 8 +-
5 files changed, 113 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/01125a11/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 315ed91..3f667a4 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -20,25 +20,25 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}
+import org.apache.spark.storage._
/**
- * Spark class responsible for passing RDDs split contents to the BlockManager and making
+ * Spark class responsible for passing RDDs partition contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
- /** Keys of RDD splits that are being computed/loaded. */
+ /** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()
- /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
+ /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
- split: Partition,
+ partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
- val key = RDDBlockId(rdd.id, split.index)
+ val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
@@ -46,79 +46,28 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
- // Mark the split as loading (unless someone else marks it first)
- loading.synchronized {
- if (loading.contains(key)) {
- logInfo(s"Another thread is loading $key, waiting for it to finish...")
- while (loading.contains(key)) {
- try {
- loading.wait()
- } catch {
- case e: Exception =>
- logWarning(s"Got an exception while waiting for another thread to load $key", e)
- }
- }
- logInfo(s"Finished waiting for $key")
- /* See whether someone else has successfully loaded it. The main way this would fail
- * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
- * partition but we didn't want to make space for it. However, that case is unlikely
- * because it's unlikely that two threads would work on the same RDD partition. One
- * downside of the current code is that threads wait serially if this does happen. */
- blockManager.get(key) match {
- case Some(values) =>
- return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
- case None =>
- logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
- loading.add(key)
- }
- } else {
- loading.add(key)
- }
+ // Acquire a lock for loading this partition
+ // If another thread already holds the lock, wait for it to finish return its results
+ val storedValues = acquireLockForPartition[T](key)
+ if (storedValues.isDefined) {
+ return new InterruptibleIterator[T](context, storedValues.get)
}
+
+ // Otherwise, we have to load the partition ourselves
try {
- // If we got here, we have to load the split
logInfo(s"Partition $key not found, computing it")
- val computedValues = rdd.computeOrReadCheckpoint(split, context)
+ val computedValues = rdd.computeOrReadCheckpoint(partition, context)
- // Persist the result, so long as the task is not running locally
+ // If the task is running locally, do not persist the result
if (context.runningLocally) {
return computedValues
}
- // Keep track of blocks with updated statuses
- var updatedBlocks = Seq[(BlockId, BlockStatus)]()
- val returnValue: Iterator[T] = {
- if (storageLevel.useDisk && !storageLevel.useMemory) {
- /* In the case that this RDD is to be persisted using DISK_ONLY
- * the iterator will be passed directly to the blockManager (rather then
- * caching it to an ArrayBuffer first), then the resulting block data iterator
- * will be passed back to the user. If the iterator generates a lot of data,
- * this means that it doesn't all have to be held in memory at one time.
- * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
- * blocks aren't dropped by the block store before enabling that. */
- updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
- blockManager.get(key) match {
- case Some(values) =>
- values.asInstanceOf[Iterator[T]]
- case None =>
- logInfo(s"Failure to store $key")
- throw new SparkException("Block manager failed to return persisted value")
- }
- } else {
- // In this case the RDD is cached to an array buffer. This will save the results
- // if we're dealing with a 'one-time' iterator
- val elements = new ArrayBuffer[Any]
- elements ++= computedValues
- updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
- }
- }
-
- // Update task metrics to include any blocks whose storage status is updated
- val metrics = context.taskMetrics
- metrics.updatedBlocks = Some(updatedBlocks)
-
- new InterruptibleIterator(context, returnValue)
+ // Otherwise, cache the values and keep track of any updates in block statuses
+ val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
+ context.taskMetrics.updatedBlocks = Some(updatedBlocks)
+ new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
@@ -128,4 +77,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
}
}
+
+ /**
+ * Acquire a loading lock for the partition identified by the given block ID.
+ *
+ * If the lock is free, just acquire it and return None. Otherwise, another thread is already
+ * loading the partition, so we wait for it to finish and return the values loaded by the thread.
+ */
+ private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
+ loading.synchronized {
+ if (!loading.contains(id)) {
+ // If the partition is free, acquire its lock to compute its value
+ loading.add(id)
+ None
+ } else {
+ // Otherwise, wait for another thread to finish and return its result
+ logInfo(s"Another thread is loading $id, waiting for it to finish...")
+ while (loading.contains(id)) {
+ try {
+ loading.wait()
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception while waiting for another thread to load $id", e)
+ }
+ }
+ logInfo(s"Finished waiting for $id")
+ val values = blockManager.get(id)
+ if (!values.isDefined) {
+ /* The block is not guaranteed to exist even after the other thread has finished.
+ * For instance, the block could be evicted after it was put, but before our get.
+ * In this case, we still need to load the partition ourselves. */
+ logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
+ loading.add(id)
+ }
+ values.map(_.asInstanceOf[Iterator[T]])
+ }
+ }
+ }
+
+ /**
+ * Cache the values of a partition, keeping track of any updates in the storage statuses
+ * of other blocks along the way.
+ */
+ private def putInBlockManager[T](
+ key: BlockId,
+ values: Iterator[T],
+ storageLevel: StorageLevel,
+ updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
+
+ if (!storageLevel.useMemory) {
+ /* This RDD is not to be cached in memory, so we can just pass the computed values
+ * as an iterator directly to the BlockManager, rather than first fully unrolling
+ * it in memory. The latter option potentially uses much more memory and risks OOM
+ * exceptions that can be avoided. */
+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
+ blockManager.get(key) match {
+ case Some(v) => v.asInstanceOf[Iterator[T]]
+ case None =>
+ logInfo(s"Failure to store $key")
+ throw new BlockException(key, s"Block manager failed to return cached value for $key!")
+ }
+ } else {
+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
+ * to the BlockManager as an iterator and expect to read it back later. This is because
+ * we may end up dropping a partition from memory store before getting it back, e.g.
+ * when the entirety of the RDD does not fit in memory. */
+ val elements = new ArrayBuffer[Any]
+ elements ++= values
+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
+ elements.iterator.asInstanceOf[Iterator[T]]
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/01125a11/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 0e8d551..bbf9f73 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -17,11 +17,12 @@
package org.apache.spark.scheduler
+import scala.language.existentials
+
import java.io._
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.HashMap
-import scala.language.existentials
import org.apache.spark._
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
http://git-wip-us.apache.org/repos/asf/spark/blob/01125a11/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 0098b5a..859cdc5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -25,10 +25,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.HashMap
import org.apache.spark._
-import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.storage._
import org.apache.spark.shuffle.ShuffleWriter
private[spark] object ShuffleMapTask {
@@ -150,7 +147,7 @@ private[spark] class ShuffleMapTask(
for (elem <- rdd.iterator(split, context)) {
writer.write(elem.asInstanceOf[Product2[Any, Any]])
}
- return writer.stop(success = true).get
+ writer.stop(success = true).get
} catch {
case e: Exception =>
if (writer != null) {
http://git-wip-us.apache.org/repos/asf/spark/blob/01125a11/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index 9a9be04..b9b53b1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -24,11 +24,11 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
/**
- * Abstract class to store blocks
+ * Abstract class to store blocks.
*/
-private[spark]
-abstract class BlockStore(val blockManager: BlockManager) extends Logging {
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult
+private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
+
+ def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
/**
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
@@ -37,11 +37,17 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
- returnValues: Boolean) : PutResult
+ def putValues(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult
- def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
- returnValues: Boolean) : PutResult
+ def putValues(
+ blockId: BlockId,
+ values: ArrayBuffer[Any],
+ level: StorageLevel,
+ returnValues: Boolean): PutResult
/**
* Return the size of a block in bytes.
http://git-wip-us.apache.org/repos/asf/spark/blob/01125a11/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 084a566..71f66c8 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -58,11 +58,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
- PutResult(sizeEstimate, Left(values.toIterator))
+ val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true)
+ PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks)
} else {
- tryToPut(blockId, bytes, bytes.limit, false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}