You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/26 11:18:52 UTC
spark git commit: [SPARK-22068][CORE] Reduce the duplicate code
between putIteratorAsValues and putIteratorAsBytes
Repository: spark
Updated Branches:
refs/heads/master c22eaa94e -> 3e2525147
[SPARK-22068][CORE] Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes
## What changes were proposed in this pull request?
The code logic between `MemoryStore.putIteratorAsValues` and `Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate code between them.
## How was this patch tested?
Existing UT.
Author: Xianyang Liu <xi...@intel.com>
Closes #19285 from ConeyLiu/rmemorystore.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e252514
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e252514
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e252514
Branch: refs/heads/master
Commit: 3e252514741447004f3c18ddd77c617b4e37cfaa
Parents: c22eaa9
Author: Xianyang Liu <xi...@intel.com>
Authored: Fri Jan 26 19:18:18 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jan 26 19:18:18 2018 +0800
----------------------------------------------------------------------
.../spark/storage/memory/MemoryStore.scala | 336 ++++++++++---------
1 file changed, 178 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3e252514/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 17f7a69..4cc5bcb 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -162,7 +162,7 @@ private[spark] class MemoryStore(
}
/**
- * Attempt to put the given block in memory store as values.
+ * Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while periodically checking
@@ -170,18 +170,24 @@ private[spark] class MemoryStore(
* temporary unroll memory used during the materialization is "transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the block.
*
- * @return in case of success, the estimated size of the stored data. In case of failure, return
- * an iterator containing the values of the block. The returned iterator will be backed
- * by the combination of the partially-unrolled block and the remaining elements of the
- * original input iterator. The caller must either fully consume this iterator or call
- * `close()` on it in order to free the storage memory consumed by the partially-unrolled
- * block.
+ * @param blockId The block id.
+ * @param values The values which need be stored.
+ * @param classTag the [[ClassTag]] for the block.
+ * @param memoryMode The values saved memory mode(ON_HEAP or OFF_HEAP).
+ * @param valuesHolder A holder that supports storing record of values into memory store as
+ * values or bytes.
+ * @return if the block is stored successfully, return the stored data size. Else return the
+ * memory has reserved for unrolling the block (There are two reasons for store failed:
+ * First, the block is partially-unrolled; second, the block is entirely unrolled and
+ * the actual stored data size is larger than reserved, but we can't request extra
+ * memory).
*/
- private[storage] def putIteratorAsValues[T](
+ private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
- classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
-
+ classTag: ClassTag[T],
+ memoryMode: MemoryMode,
+ valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
@@ -198,12 +204,10 @@ private[spark] class MemoryStore(
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
- // Underlying vector for unrolling the block
- var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
keepUnrolling =
- reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
+ reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@@ -214,14 +218,14 @@ private[spark] class MemoryStore(
// Unroll this block safely, checking whether we have exceeded our threshold periodically
while (values.hasNext && keepUnrolling) {
- vector += values.next()
+ valuesHolder.storeValue(values.next())
if (elementsUnrolled % memoryCheckPeriod == 0) {
+ val currentSize = valuesHolder.estimatedSize()
// If our vector's size has exceeded the threshold, request more memory
- val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
- reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
+ reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
@@ -232,78 +236,86 @@ private[spark] class MemoryStore(
elementsUnrolled += 1
}
+ // Make sure that we have enough memory to store the block. By this point, it is possible that
+ // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
+ // perform one final call to attempt to allocate additional memory if necessary.
if (keepUnrolling) {
- // We successfully unrolled the entirety of this block
- val arrayValues = vector.toArray
- vector = null
- val entry =
- new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
- val size = entry.size
- def transferUnrollToStorage(amount: Long): Unit = {
+ val entryBuilder = valuesHolder.getBuilder()
+ val size = entryBuilder.preciseSize
+ if (size > unrollMemoryUsedByThisBlock) {
+ val amountToRequest = size - unrollMemoryUsedByThisBlock
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
+ if (keepUnrolling) {
+ unrollMemoryUsedByThisBlock += amountToRequest
+ }
+ }
+
+ if (keepUnrolling) {
+ val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
- val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
+ releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
+ val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
- }
- // Acquire storage memory if necessary to store this block in memory.
- val enoughStorageMemory = {
- if (unrollMemoryUsedByThisBlock <= size) {
- val acquiredExtra =
- memoryManager.acquireStorageMemory(
- blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
- if (acquiredExtra) {
- transferUnrollToStorage(unrollMemoryUsedByThisBlock)
- }
- acquiredExtra
- } else { // unrollMemoryUsedByThisBlock > size
- // If this task attempt already owns more unroll memory than is necessary to store the
- // block, then release the extra memory that will not be used.
- val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
- transferUnrollToStorage(size)
- true
- }
- }
- if (enoughStorageMemory) {
+
entries.synchronized {
entries.put(blockId, entry)
}
- logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
- blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
- Right(size)
+
+ logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
+ Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
+ Right(entry.size)
} else {
- assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
- "released too much unroll memory")
+ // We ran out of space while unrolling the values for this block
+ logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ } else {
+ // We ran out of space while unrolling the values for this block
+ logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ }
+
+ /**
+ * Attempt to put the given block in memory store as values.
+ *
+ * @return in case of success, the estimated size of the stored data. In case of failure, return
+ * an iterator containing the values of the block. The returned iterator will be backed
+ * by the combination of the partially-unrolled block and the remaining elements of the
+ * original input iterator. The caller must either fully consume this iterator or call
+ * `close()` on it in order to free the storage memory consumed by the partially-unrolled
+ * block.
+ */
+ private[storage] def putIteratorAsValues[T](
+ blockId: BlockId,
+ values: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+
+ val valuesHolder = new DeserializedValuesHolder[T](classTag)
+
+ putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
+ case Right(storedSize) => Right(storedSize)
+ case Left(unrollMemoryUsedByThisBlock) =>
+ val unrolledIterator = if (valuesHolder.vector != null) {
+ valuesHolder.vector.iterator
+ } else {
+ valuesHolder.arrayValues.toIterator
+ }
+
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
- unrolled = arrayValues.toIterator,
- rest = Iterator.empty))
- }
- } else {
- // We ran out of space while unrolling the values for this block
- logUnrollFailureMessage(blockId, vector.estimateSize())
- Left(new PartiallyUnrolledIterator(
- this,
- MemoryMode.ON_HEAP,
- unrollMemoryUsedByThisBlock,
- unrolled = vector.iterator,
- rest = values))
+ unrolled = unrolledIterator,
+ rest = values))
}
}
/**
* Attempt to put the given block in memory store as bytes.
*
- * It's possible that the iterator is too large to materialize and store in memory. To avoid
- * OOM exceptions, this method will gradually unroll the iterator while periodically checking
- * whether there is enough free memory. If the block is successfully materialized, then the
- * temporary unroll memory used during the materialization is "transferred" to storage memory,
- * so we won't acquire more memory than is actually needed to store the block.
- *
* @return in case of success, the estimated size of the stored data. In case of failure,
* return a handle which allows the caller to either finish the serialization by
* spilling to disk or to deserialize the partially-serialized block and reconstruct
@@ -319,25 +331,8 @@ private[spark] class MemoryStore(
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- val allocator = memoryMode match {
- case MemoryMode.ON_HEAP => ByteBuffer.allocate _
- case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
- }
-
- // Whether there is still enough memory for us to continue unrolling this block
- var keepUnrolling = true
- // Number of elements unrolled so far
- var elementsUnrolled = 0L
- // How often to check whether we need to request more memory
- val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
- // Memory to request as a multiple of current bbos size
- val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
- // Keep track of unroll memory used by this particular block / putIterator() operation
- var unrollMemoryUsedByThisBlock = 0L
- // Underlying buffer for unrolling the block
- val redirectableStream = new RedirectableOutputStream
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
@@ -346,85 +341,22 @@ private[spark] class MemoryStore(
} else {
initialMemoryThreshold.toInt
}
- val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
- redirectableStream.setOutputStream(bbos)
- val serializationStream: SerializationStream = {
- val autoPick = !blockId.isInstanceOf[StreamBlockId]
- val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
- ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
- }
- // Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
+ val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
+ memoryMode, serializerManager)
- if (!keepUnrolling) {
- logWarning(s"Failed to reserve initial memory threshold of " +
- s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
- } else {
- unrollMemoryUsedByThisBlock += initialMemoryThreshold
- }
-
- def reserveAdditionalMemoryIfNecessary(): Unit = {
- if (bbos.size > unrollMemoryUsedByThisBlock) {
- val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
- if (keepUnrolling) {
- unrollMemoryUsedByThisBlock += amountToRequest
- }
- }
- }
-
- // Unroll this block safely, checking whether we have exceeded our threshold
- while (values.hasNext && keepUnrolling) {
- serializationStream.writeObject(values.next())(classTag)
- elementsUnrolled += 1
- if (elementsUnrolled % memoryCheckPeriod == 0) {
- reserveAdditionalMemoryIfNecessary()
- }
- }
-
- // Make sure that we have enough memory to store the block. By this point, it is possible that
- // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
- // perform one final call to attempt to allocate additional memory if necessary.
- if (keepUnrolling) {
- serializationStream.close()
- if (bbos.size > unrollMemoryUsedByThisBlock) {
- val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
- if (keepUnrolling) {
- unrollMemoryUsedByThisBlock += amountToRequest
- }
- }
- }
-
- if (keepUnrolling) {
- val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
- // Synchronize so that transfer is atomic
- memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
- val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
- assert(success, "transferring unroll memory to storage memory failed")
- }
- entries.synchronized {
- entries.put(blockId, entry)
- }
- logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
- blockId, Utils.bytesToString(entry.size),
- Utils.bytesToString(maxMemory - blocksMemoryUsed)))
- Right(entry.size)
- } else {
- // We ran out of space while unrolling the values for this block
- logUnrollFailureMessage(blockId, bbos.size)
- Left(
- new PartiallySerializedBlock(
+ putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
+ case Right(storedSize) => Right(storedSize)
+ case Left(unrollMemoryUsedByThisBlock) =>
+ Left(new PartiallySerializedBlock(
this,
serializerManager,
blockId,
- serializationStream,
- redirectableStream,
+ valuesHolder.serializationStream,
+ valuesHolder.redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
- bbos,
+ valuesHolder.bbos,
values,
classTag))
}
@@ -702,6 +634,94 @@ private[spark] class MemoryStore(
}
}
+private trait MemoryEntryBuilder[T] {
+ def preciseSize: Long
+ def build(): MemoryEntry[T]
+}
+
+private trait ValuesHolder[T] {
+ def storeValue(value: T): Unit
+ def estimatedSize(): Long
+
+ /**
+ * Note: After this method is called, the ValuesHolder is invalid, we can't store data and
+ * get estimate size again.
+ * @return a MemoryEntryBuilder which is used to build a memory entry and get the stored data
+ * size.
+ */
+ def getBuilder(): MemoryEntryBuilder[T]
+}
+
+/**
+ * A holder for storing the deserialized values.
+ */
+private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[T]()(classTag)
+ var arrayValues: Array[T] = null
+
+ override def storeValue(value: T): Unit = {
+ vector += value
+ }
+
+ override def estimatedSize(): Long = {
+ vector.estimateSize()
+ }
+
+ override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
+ // We successfully unrolled the entirety of this block
+ arrayValues = vector.toArray
+ vector = null
+
+ override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
+
+ override def build(): MemoryEntry[T] =
+ DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+ }
+}
+
+/**
+ * A holder for storing the serialized values.
+ */
+private class SerializedValuesHolder[T](
+ blockId: BlockId,
+ chunkSize: Int,
+ classTag: ClassTag[T],
+ memoryMode: MemoryMode,
+ serializerManager: SerializerManager) extends ValuesHolder[T] {
+ val allocator = memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+
+ val redirectableStream = new RedirectableOutputStream
+ val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
+ redirectableStream.setOutputStream(bbos)
+ val serializationStream: SerializationStream = {
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
+ ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
+ }
+
+ override def storeValue(value: T): Unit = {
+ serializationStream.writeObject(value)(classTag)
+ }
+
+ override def estimatedSize(): Long = {
+ bbos.size
+ }
+
+ override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
+ // We successfully unrolled the entirety of this block
+ serializationStream.close()
+
+ override def preciseSize(): Long = bbos.size
+
+ override def build(): MemoryEntry[T] =
+ SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
+ }
+}
+
/**
* The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org