You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jiangxb1987 <gi...@git.apache.org> on 2017/11/07 14:24:42 UTC
[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/19285#discussion_r149379088
--- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -261,37 +259,97 @@ private[spark] class MemoryStore(
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
+ releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
+
if (enoughStorageMemory) {
entries.synchronized {
- entries.put(blockId, entry)
+ entries.put(blockId, createMemoryEntry())
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
} else {
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ } else {
+ Left(unrollMemoryUsedByThisBlock)
+ }
+ }
+
+ /**
+ * Attempt to put the given block in memory store as values.
+ *
+ * It's possible that the iterator is too large to materialize and store in memory. To avoid
+ * OOM exceptions, this method will gradually unroll the iterator while periodically checking
+ * whether there is enough free memory. If the block is successfully materialized, then the
+ * temporary unroll memory used during the materialization is "transferred" to storage memory,
+ * so we won't acquire more memory than is actually needed to store the block.
+ *
+ * @return in case of success, the estimated size of the stored data. In case of failure, return
+ * an iterator containing the values of the block. The returned iterator will be backed
+ * by the combination of the partially-unrolled block and the remaining elements of the
+ * original input iterator. The caller must either fully consume this iterator or call
+ * `close()` on it in order to free the storage memory consumed by the partially-unrolled
+ * block.
+ */
+ private[storage] def putIteratorAsValues[T](
+ blockId: BlockId,
+ values: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[T]()(classTag)
+ var arrayValues: Array[T] = null
+ var preciseSize: Long = -1
+
+ def storeValue(value: T): Unit = {
+ vector += value
+ }
+
+ def estimateSize(precise: Boolean): Long = {
+ if (precise) {
+ // We only call need the precise size after all values unrolled.
+ arrayValues = vector.toArray
+ preciseSize = SizeEstimator.estimate(arrayValues)
+ vector = null
--- End diff --
It looks scary to put vector to null in the function `estimateSize`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org