You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by liyezhang556520 <gi...@git.apache.org> on 2014/08/12 10:22:06 UTC

[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

Github user liyezhang556520 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1165#discussion_r16100690
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
    @@ -141,6 +193,93 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       }
     
       /**
    +   * Unroll the given block in memory safely.
    +   *
    +   * The safety of this operation refers to avoiding potential OOM exceptions caused by
    +   * unrolling the entirety of the block in memory at once. This is achieved by periodically
    +   * checking whether the memory restrictions for unrolling blocks are still satisfied,
    +   * stopping immediately if not. This check is a safeguard against the scenario in which
    +   * there is not enough free memory to accommodate the entirety of a single block.
    +   *
    +   * This method returns either an array with the contents of the entire block or an iterator
    +   * containing the values of the block (if the array would have exceeded available memory).
    +   */
    +  def unrollSafely(
    +      blockId: BlockId,
    +      values: Iterator[Any],
    +      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
    +    : Either[Array[Any], Iterator[Any]] = {
    +
    +    // Number of elements unrolled so far
    +    var elementsUnrolled = 0
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
    +    val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
    +    // How often to check whether we need to request more memory
    +    val memoryCheckPeriod = 16
    +    // Memory currently reserved by this thread for this particular unrolling operation
    +    var memoryThreshold = initialMemoryThreshold
    +    // Memory to request as a multiple of current vector size
    +    val memoryGrowthFactor = 1.5
    +    // Previous unroll memory held by this thread, for releasing later (only at the very end)
    +    val previousMemoryReserved = currentUnrollMemoryForThisThread
    +    // Underlying vector for unrolling the block
    +    var vector = new SizeTrackingVector[Any]
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    +    try {
    +      while (values.hasNext && keepUnrolling) {
    +        vector += values.next()
    +        if (elementsUnrolled % memoryCheckPeriod == 0) {
    +          // If our vector's size has exceeded the threshold, request more memory
    +          val currentSize = vector.estimateSize()
    +          if (currentSize >= memoryThreshold) {
    +            val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
    +            // Hold the accounting lock, in case another thread concurrently puts a block that
    +            // takes up the unrolling space we just ensured here
    +            accountingLock.synchronized {
    +              if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
    +                // If the first request is not granted, try again after ensuring free space
    +                // If there is still not enough space, give up and drop the partition
    +                val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
    +                if (spaceToEnsure > 0) {
    +                  val result = ensureFreeSpace(blockId, spaceToEnsure)
    +                  droppedBlocks ++= result.droppedBlocks
    +                }
    +                keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
    +              }
    +            }
    +            // New threshold is currentSize * memoryGrowthFactor
    +            memoryThreshold = currentSize + amountToRequest
    +          }
    +        }
    +        elementsUnrolled += 1
    +      }
    +
    +      if (keepUnrolling) {
    +        // We successfully unrolled the entirety of this block
    +        Left(vector.toArray)
    +      } else {
    +        // We ran out of space while unrolling the values for this block
    +        Right(vector.iterator ++ values)
    +      }
    +
    +    } finally {
    +      // If we return an array, the values returned do not depend on the underlying vector and
    +      // we can immediately free up space for other threads. Otherwise, if we return an iterator,
    +      // we release the memory claimed by this thread later on when the task finishes.
    +      if (keepUnrolling) {
    +        val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
    +        releaseUnrollMemoryForThisThread(amountToRelease)
    --- End diff --
    
    @andrewor14 
    Hi Andrew, I have two questions here:
    1, If we return an array, I think we should not release the memory for this thread here, instead should release the memory immediately after the block has been put into the memory. Because the memory is reserved by this thread for caching the block in memory in future, if we release it here, the released memory might be regarded as free memory by other threads. For example here is a case: there are 2 threads executing this function (`unrollSafely`) in serial (for some reason not in parallel), the second thread can gets more `actualFreeMemory` because the first thread has released it's memory for unrolling in future. And when the two threads try to call `putArray` in parallel to cache the block in memory, they may find that there is no space for them to cache the block at the same time. So will this cause the wrong estimation?
    
    2, if we return an iterator, why shall we release the memory later on when the task finishes? Returning iterator means we ran out of memory and would drop the block to disk when using `MEMORY_AND_DISK` or just skip caching in memory when using `MEMORY_ONLY`. So the memory stored in `unrollMemoryMap` for this thread will no longer be used. why not release it here for other threads to use?
    
    Maybe I have some misunderstanding of the code, can you explain it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org