You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jiangxb1987 <gi...@git.apache.org> on 2017/11/07 14:24:42 UTC

[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19285#discussion_r149379088
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -261,37 +259,97 @@ private[spark] class MemoryStore(
               // If this task attempt already owns more unroll memory than is necessary to store the
               // block, then release the extra memory that will not be used.
               val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
    -          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
    +          releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
               transferUnrollToStorage(size)
               true
             }
           }
    +
           if (enoughStorageMemory) {
             entries.synchronized {
    -          entries.put(blockId, entry)
    +          entries.put(blockId, createMemoryEntry())
             }
             logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
               blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
             Right(size)
           } else {
             assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
               "released too much unroll memory")
    +        Left(unrollMemoryUsedByThisBlock)
    +      }
    +    } else {
    +      Left(unrollMemoryUsedByThisBlock)
    +    }
    +  }
    +
    +  /**
    +   * Attempt to put the given block in memory store as values.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated size of the stored data. In case of failure, return
    +   *         an iterator containing the values of the block. The returned iterator will be backed
    +   *         by the combination of the partially-unrolled block and the remaining elements of the
    +   *         original input iterator. The caller must either fully consume this iterator or call
    +   *         `close()` on it in order to free the storage memory consumed by the partially-unrolled
    +   *         block.
    +   */
    +  private[storage] def putIteratorAsValues[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
    +
    +    // Underlying vector for unrolling the block
    +    var vector = new SizeTrackingVector[T]()(classTag)
    +    var arrayValues: Array[T] = null
    +    var preciseSize: Long = -1
    +
    +    def storeValue(value: T): Unit = {
    +      vector += value
    +    }
    +
    +    def estimateSize(precise: Boolean): Long = {
    +      if (precise) {
    +        // We only call need the precise size after all values unrolled.
    +        arrayValues = vector.toArray
    +        preciseSize = SizeEstimator.estimate(arrayValues)
    +        vector = null
    --- End diff --
    
    It looks scary to put vector to null in the function `estimateSize`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org