You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/26 11:18:52 UTC

spark git commit: [SPARK-22068][CORE] Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes

Repository: spark
Updated Branches:
  refs/heads/master c22eaa94e -> 3e2525147


[SPARK-22068][CORE] Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes

## What changes were proposed in this pull request?

The code logic between `MemoryStore.putIteratorAsValues` and `Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate code between them.

## How was this patch tested?

Existing UT.

Author: Xianyang Liu <xi...@intel.com>

Closes #19285 from ConeyLiu/rmemorystore.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e252514
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e252514
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e252514

Branch: refs/heads/master
Commit: 3e252514741447004f3c18ddd77c617b4e37cfaa
Parents: c22eaa9
Author: Xianyang Liu <xi...@intel.com>
Authored: Fri Jan 26 19:18:18 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jan 26 19:18:18 2018 +0800

----------------------------------------------------------------------
 .../spark/storage/memory/MemoryStore.scala      | 336 ++++++++++---------
 1 file changed, 178 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e252514/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 17f7a69..4cc5bcb 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -162,7 +162,7 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store as values.
+   * Attempt to put the given block in memory store as values or bytes.
    *
    * It's possible that the iterator is too large to materialize and store in memory. To avoid
    * OOM exceptions, this method will gradually unroll the iterator while periodically checking
@@ -170,18 +170,24 @@ private[spark] class MemoryStore(
    * temporary unroll memory used during the materialization is "transferred" to storage memory,
    * so we won't acquire more memory than is actually needed to store the block.
    *
-   * @return in case of success, the estimated size of the stored data. In case of failure, return
-   *         an iterator containing the values of the block. The returned iterator will be backed
-   *         by the combination of the partially-unrolled block and the remaining elements of the
-   *         original input iterator. The caller must either fully consume this iterator or call
-   *         `close()` on it in order to free the storage memory consumed by the partially-unrolled
-   *         block.
+   * @param blockId The block id.
+   * @param values The values which need be stored.
+   * @param classTag the [[ClassTag]] for the block.
+   * @param memoryMode The values saved memory mode(ON_HEAP or OFF_HEAP).
+   * @param valuesHolder A holder that supports storing record of values into memory store as
+   *        values or bytes.
+   * @return if the block is stored successfully, return the stored data size. Else return the
+   *         memory has reserved for unrolling the block (There are two reasons for store failed:
+   *         First, the block is partially-unrolled; second, the block is entirely unrolled and
+   *         the actual stored data size is larger than reserved, but we can't request extra
+   *         memory).
    */
-  private[storage] def putIteratorAsValues[T](
+  private def putIterator[T](
       blockId: BlockId,
       values: Iterator[T],
-      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
-
+      classTag: ClassTag[T],
+      memoryMode: MemoryMode,
+      valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
 
     // Number of elements unrolled so far
@@ -198,12 +204,10 @@ private[spark] class MemoryStore(
     val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
     // Keep track of unroll memory used by this particular block / putIterator() operation
     var unrollMemoryUsedByThisBlock = 0L
-    // Underlying vector for unrolling the block
-    var vector = new SizeTrackingVector[T]()(classTag)
 
     // Request enough memory to begin unrolling
     keepUnrolling =
-      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
+      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
 
     if (!keepUnrolling) {
       logWarning(s"Failed to reserve initial memory threshold of " +
@@ -214,14 +218,14 @@ private[spark] class MemoryStore(
 
     // Unroll this block safely, checking whether we have exceeded our threshold periodically
     while (values.hasNext && keepUnrolling) {
-      vector += values.next()
+      valuesHolder.storeValue(values.next())
       if (elementsUnrolled % memoryCheckPeriod == 0) {
+        val currentSize = valuesHolder.estimatedSize()
         // If our vector's size has exceeded the threshold, request more memory
-        val currentSize = vector.estimateSize()
         if (currentSize >= memoryThreshold) {
           val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
           keepUnrolling =
-            reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
+            reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
           if (keepUnrolling) {
             unrollMemoryUsedByThisBlock += amountToRequest
           }
@@ -232,78 +236,86 @@ private[spark] class MemoryStore(
       elementsUnrolled += 1
     }
 
+    // Make sure that we have enough memory to store the block. By this point, it is possible that
+    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
+    // perform one final call to attempt to allocate additional memory if necessary.
     if (keepUnrolling) {
-      // We successfully unrolled the entirety of this block
-      val arrayValues = vector.toArray
-      vector = null
-      val entry =
-        new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
-      val size = entry.size
-      def transferUnrollToStorage(amount: Long): Unit = {
+      val entryBuilder = valuesHolder.getBuilder()
+      val size = entryBuilder.preciseSize
+      if (size > unrollMemoryUsedByThisBlock) {
+        val amountToRequest = size - unrollMemoryUsedByThisBlock
+        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
+        if (keepUnrolling) {
+          unrollMemoryUsedByThisBlock += amountToRequest
+        }
+      }
+
+      if (keepUnrolling) {
+        val entry = entryBuilder.build()
         // Synchronize so that transfer is atomic
         memoryManager.synchronized {
-          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
-          val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
+          releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
+          val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
           assert(success, "transferring unroll memory to storage memory failed")
         }
-      }
-      // Acquire storage memory if necessary to store this block in memory.
-      val enoughStorageMemory = {
-        if (unrollMemoryUsedByThisBlock <= size) {
-          val acquiredExtra =
-            memoryManager.acquireStorageMemory(
-              blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
-          if (acquiredExtra) {
-            transferUnrollToStorage(unrollMemoryUsedByThisBlock)
-          }
-          acquiredExtra
-        } else { // unrollMemoryUsedByThisBlock > size
-          // If this task attempt already owns more unroll memory than is necessary to store the
-          // block, then release the extra memory that will not be used.
-          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
-          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
-          transferUnrollToStorage(size)
-          true
-        }
-      }
-      if (enoughStorageMemory) {
+
         entries.synchronized {
           entries.put(blockId, entry)
         }
-        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
-          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
-        Right(size)
+
+        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
+          Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
+        Right(entry.size)
       } else {
-        assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
-          "released too much unroll memory")
+        // We ran out of space while unrolling the values for this block
+        logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
+        Left(unrollMemoryUsedByThisBlock)
+      }
+    } else {
+      // We ran out of space while unrolling the values for this block
+      logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
+      Left(unrollMemoryUsedByThisBlock)
+    }
+  }
+
+  /**
+   * Attempt to put the given block in memory store as values.
+   *
+   * @return in case of success, the estimated size of the stored data. In case of failure, return
+   *         an iterator containing the values of the block. The returned iterator will be backed
+   *         by the combination of the partially-unrolled block and the remaining elements of the
+   *         original input iterator. The caller must either fully consume this iterator or call
+   *         `close()` on it in order to free the storage memory consumed by the partially-unrolled
+   *         block.
+   */
+  private[storage] def putIteratorAsValues[T](
+      blockId: BlockId,
+      values: Iterator[T],
+      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+
+    val valuesHolder = new DeserializedValuesHolder[T](classTag)
+
+    putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
+      case Right(storedSize) => Right(storedSize)
+      case Left(unrollMemoryUsedByThisBlock) =>
+        val unrolledIterator = if (valuesHolder.vector != null) {
+          valuesHolder.vector.iterator
+        } else {
+          valuesHolder.arrayValues.toIterator
+        }
+
         Left(new PartiallyUnrolledIterator(
           this,
           MemoryMode.ON_HEAP,
           unrollMemoryUsedByThisBlock,
-          unrolled = arrayValues.toIterator,
-          rest = Iterator.empty))
-      }
-    } else {
-      // We ran out of space while unrolling the values for this block
-      logUnrollFailureMessage(blockId, vector.estimateSize())
-      Left(new PartiallyUnrolledIterator(
-        this,
-        MemoryMode.ON_HEAP,
-        unrollMemoryUsedByThisBlock,
-        unrolled = vector.iterator,
-        rest = values))
+          unrolled = unrolledIterator,
+          rest = values))
     }
   }
 
   /**
    * Attempt to put the given block in memory store as bytes.
    *
-   * It's possible that the iterator is too large to materialize and store in memory. To avoid
-   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
-   * whether there is enough free memory. If the block is successfully materialized, then the
-   * temporary unroll memory used during the materialization is "transferred" to storage memory,
-   * so we won't acquire more memory than is actually needed to store the block.
-   *
    * @return in case of success, the estimated size of the stored data. In case of failure,
    *         return a handle which allows the caller to either finish the serialization by
    *         spilling to disk or to deserialize the partially-serialized block and reconstruct
@@ -319,25 +331,8 @@ private[spark] class MemoryStore(
 
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
 
-    val allocator = memoryMode match {
-      case MemoryMode.ON_HEAP => ByteBuffer.allocate _
-      case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
-    }
-
-    // Whether there is still enough memory for us to continue unrolling this block
-    var keepUnrolling = true
-    // Number of elements unrolled so far
-    var elementsUnrolled = 0L
-    // How often to check whether we need to request more memory
-    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
-    // Memory to request as a multiple of current bbos size
-    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
     // Initial per-task memory to request for unrolling blocks (bytes).
     val initialMemoryThreshold = unrollMemoryThreshold
-    // Keep track of unroll memory used by this particular block / putIterator() operation
-    var unrollMemoryUsedByThisBlock = 0L
-    // Underlying buffer for unrolling the block
-    val redirectableStream = new RedirectableOutputStream
     val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
       logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
         s"is too large to be set as chunk size. Chunk size has been capped to " +
@@ -346,85 +341,22 @@ private[spark] class MemoryStore(
     } else {
       initialMemoryThreshold.toInt
     }
-    val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
-    redirectableStream.setOutputStream(bbos)
-    val serializationStream: SerializationStream = {
-      val autoPick = !blockId.isInstanceOf[StreamBlockId]
-      val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
-      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
-    }
 
-    // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
+    val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
+      memoryMode, serializerManager)
 
-    if (!keepUnrolling) {
-      logWarning(s"Failed to reserve initial memory threshold of " +
-        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
-    } else {
-      unrollMemoryUsedByThisBlock += initialMemoryThreshold
-    }
-
-    def reserveAdditionalMemoryIfNecessary(): Unit = {
-      if (bbos.size > unrollMemoryUsedByThisBlock) {
-        val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong
-        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
-        if (keepUnrolling) {
-          unrollMemoryUsedByThisBlock += amountToRequest
-        }
-      }
-    }
-
-    // Unroll this block safely, checking whether we have exceeded our threshold
-    while (values.hasNext && keepUnrolling) {
-      serializationStream.writeObject(values.next())(classTag)
-      elementsUnrolled += 1
-      if (elementsUnrolled % memoryCheckPeriod == 0) {
-        reserveAdditionalMemoryIfNecessary()
-      }
-    }
-
-    // Make sure that we have enough memory to store the block. By this point, it is possible that
-    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
-    // perform one final call to attempt to allocate additional memory if necessary.
-    if (keepUnrolling) {
-      serializationStream.close()
-      if (bbos.size > unrollMemoryUsedByThisBlock) {
-        val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
-        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
-        if (keepUnrolling) {
-          unrollMemoryUsedByThisBlock += amountToRequest
-        }
-      }
-    }
-
-    if (keepUnrolling) {
-      val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
-      // Synchronize so that transfer is atomic
-      memoryManager.synchronized {
-        releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
-        val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
-        assert(success, "transferring unroll memory to storage memory failed")
-      }
-      entries.synchronized {
-        entries.put(blockId, entry)
-      }
-      logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
-        blockId, Utils.bytesToString(entry.size),
-        Utils.bytesToString(maxMemory - blocksMemoryUsed)))
-      Right(entry.size)
-    } else {
-      // We ran out of space while unrolling the values for this block
-      logUnrollFailureMessage(blockId, bbos.size)
-      Left(
-        new PartiallySerializedBlock(
+    putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
+      case Right(storedSize) => Right(storedSize)
+      case Left(unrollMemoryUsedByThisBlock) =>
+        Left(new PartiallySerializedBlock(
           this,
           serializerManager,
           blockId,
-          serializationStream,
-          redirectableStream,
+          valuesHolder.serializationStream,
+          valuesHolder.redirectableStream,
           unrollMemoryUsedByThisBlock,
           memoryMode,
-          bbos,
+          valuesHolder.bbos,
           values,
           classTag))
     }
@@ -702,6 +634,94 @@ private[spark] class MemoryStore(
   }
 }
 
+private trait MemoryEntryBuilder[T] {
+  def preciseSize: Long
+  def build(): MemoryEntry[T]
+}
+
+private trait ValuesHolder[T] {
+  def storeValue(value: T): Unit
+  def estimatedSize(): Long
+
+  /**
+   * Note: After this method is called, the ValuesHolder is invalid, we can't store data and
+   * get estimate size again.
+   * @return a MemoryEntryBuilder which is used to build a memory entry and get the stored data
+   *         size.
+   */
+  def getBuilder(): MemoryEntryBuilder[T]
+}
+
+/**
+ * A holder for storing the deserialized values.
+ */
+private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
+  // Underlying vector for unrolling the block
+  var vector = new SizeTrackingVector[T]()(classTag)
+  var arrayValues: Array[T] = null
+
+  override def storeValue(value: T): Unit = {
+    vector += value
+  }
+
+  override def estimatedSize(): Long = {
+    vector.estimateSize()
+  }
+
+  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
+    // We successfully unrolled the entirety of this block
+    arrayValues = vector.toArray
+    vector = null
+
+    override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
+
+    override def build(): MemoryEntry[T] =
+      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
+  }
+}
+
+/**
+ * A holder for storing the serialized values.
+ */
+private class SerializedValuesHolder[T](
+    blockId: BlockId,
+    chunkSize: Int,
+    classTag: ClassTag[T],
+    memoryMode: MemoryMode,
+    serializerManager: SerializerManager) extends ValuesHolder[T] {
+  val allocator = memoryMode match {
+    case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+    case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+  }
+
+  val redirectableStream = new RedirectableOutputStream
+  val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
+  redirectableStream.setOutputStream(bbos)
+  val serializationStream: SerializationStream = {
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
+    ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
+  }
+
+  override def storeValue(value: T): Unit = {
+    serializationStream.writeObject(value)(classTag)
+  }
+
+  override def estimatedSize(): Long = {
+    bbos.size
+  }
+
+  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
+    // We successfully unrolled the entirety of this block
+    serializationStream.close()
+
+    override def preciseSize(): Long = bbos.size
+
+    override def build(): MemoryEntry[T] =
+      SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
+  }
+}
+
 /**
  * The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
  *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org