You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2016/03/17 20:03:27 UTC

[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/11791

    [SPARK-13980][WIP] Incrementally serialize blocks while unrolling them in MemoryStore

    When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements.
    
    Instead, I think that we should incrementally serialize blocks while unrolling them.
    
    A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case.
    
    Diff containing only this patch's changes: https://github.com/JoshRosen/spark/compare/chunked-block-serialization...JoshRosen:serialize-incrementally?expand=1
    
    This patch is marked as WIP because it's rebased on top of / blocked by #11748.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark serialize-incrementally

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11791
    
----
commit 735eca68d8efcd150d47631644cf848b4d98603e
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T04:57:16Z

    Split MemoryEntry into two separate classes (serialized and deserialized)

commit 8f0828986b72ce722cfe0360ae863971547fc58b
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T18:53:54Z

    Add ChunkedByteBuffer and use it in storage layer.

commit 79b1a6a31236b81c444dda1e8ee1cfdf2f3c36ae
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T20:53:27Z

    Add test cases and fix bug in ChunkedByteBuffer.toInputStream()

commit 7dbcd5a9ef0c669f5db97990af944d8b63300e97
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T22:05:23Z

    WIP towards understanding destruction.

commit 3fbec212d9f714386121b4aed791d6c9fb1359a2
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T22:39:27Z

    Small fixes to dispose behavior.

commit e5e663f22094333dac6e184c78176ee658e3441e
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T22:49:24Z

    Modify BlockManager.dataSerialize to write ChunkedByteBuffers.

commit de62f0d0a5f128dd91173e73b214a3297dd203d4
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T06:47:21Z

    Merge remote-tracking branch 'origin/master' into chunked-block-serialization

commit 0a347fdd9ec0e94eab17eb0f33c93acd1afbdcfb
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T06:56:02Z

    Fix test compilation in streaming.

commit 6852c482a4935b992c199810f1156952f1e93a8c
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T20:47:45Z

    Merge remote-tracking branch 'origin/master' into chunked-block-serialization

commit 43f8fa6ae5ba093655cdbd55ca56959a7652de56
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T20:54:55Z

    Allow ChunkedByteBuffer to contain no chunks.

commit 25e68841541b45d7eedc0447cc8154d746ee8db2
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T21:00:21Z

    Document toByteBuffer() and toArray() size limitations.

commit 325c83d8909472428ae65620033fff4887c36e06
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T21:07:42Z

    Move dispose() from BlockManager to StorageUtils.
    
    It was a static method before, but its location was confusing.

commit 4f5074ece49030a6e7134f7ece706ed441c02ee4
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T21:11:14Z

    Better documentation for dispose() methods.

commit b6ddf3ed40cc90ec94b7e4917808f8a726b597ee
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T21:12:39Z

    Rename limit to size.

commit 719ad3c4e9e942ce62cbcf288788aca785690a7e
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T21:20:08Z

    Implement missing InputStream methods.

commit 23006076dcb73095a9eaa7e2524a10c048bae646
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T22:00:10Z

    More comments.

commit 3fc0b66981aa2d45be129986f0dc5bd595e08b22
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-16T22:02:42Z

    Fix confusing getChunks().head

commit c747c8546ff248b8b08285e92afad2fe71875acd
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-17T18:08:34Z

    Merge remote-tracking branch 'origin/master' into chunked-block-serialization

commit cb9311b30636ced3854fd035340092e497750b47
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-17T18:09:29Z

    Fix logging import.

commit 8515cab13a7dfe08919b9500ee9194f5b7b7aa24
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T00:48:26Z

    Duplicate the code to separate putIterator() by serialization level.

commit 7dc362331f3f549670ecd9488db456b4136a3ad7
Author: Josh Rosen <jo...@databricks.com>
Date:   2016-03-15T23:48:21Z

    Incrementally serialize blocks while unrolling them.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199642011
  
    **[Test build #53742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53742/consoleFull)** for PR 11791 at commit [`4976b74`](https://github.com/apache/spark/commit/4976b743c8ac8eda86b548ea6a858d9c6f164591).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57396253
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    +    if (keepUnrolling) {
    +      serializationStream.close()
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    why have two of these `if`'s? Can't we merge them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56916240
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1074,7 +1074,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     
         // Unroll with all the space in the world. This should succeed.
    -    var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
    +    var putResult = memoryStore.putIteratorAsValues("unroll", smallList.iterator)
    --- End diff --
    
    Good call; yes. I'll add some, since there's actually a couple of new branches that need coverage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11791


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200977781
  
    **[Test build #54057 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54057/consoleFull)** for PR 11791 at commit [`749df73`](https://github.com/apache/spark/commit/749df73c8dee8733a2617ff9ab8563fe3ed9de66).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200534807
  
    **[Test build #53957 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53957/consoleFull)** for PR 11791 at commit [`768a8d9`](https://github.com/apache/spark/commit/768a8d95ef394553ced64c1fad683127e54e699b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class LogisticRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class NaiveBayesModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class KMeansModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class Binarizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class CountVectorizerModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class DCT(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class ElementwiseProduct(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable,`
      * `class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures, JavaMLReadable,`
      * `class IDF(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class IDFModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class MaxAbsScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class MaxAbsScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class MinMaxScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class NGram(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class Normalizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class PolynomialExpansion(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable,`
      * `class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol, HasSeed, JavaMLReadable,`
      * `class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class SQLTransformer(JavaTransformer, JavaMLReadable, JavaMLWritable):`
      * `class StandardScaler(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class StandardScalerModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class StringIndexer(JavaEstimator, HasInputCol, HasOutputCol, HasHandleInvalid, JavaMLReadable,`
      * `class StringIndexerModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class IndexToString(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class StopWordsRemover(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class VectorIndexer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class VectorIndexerModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class VectorSlicer(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class Word2VecModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class PCA(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, JavaMLWritable):`
      * `class PCAModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class RFormula(JavaEstimator, HasFeaturesCol, HasLabelCol, JavaMLReadable, JavaMLWritable):`
      * `class RFormulaModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class ChiSqSelector(JavaEstimator, HasFeaturesCol, HasOutputCol, HasLabelCol, JavaMLReadable,`
      * `class ChiSqSelectorModel(JavaModel, JavaMLReadable, JavaMLWritable):`
      * `class PipelineMLWriter(JavaMLWriter):`
      * `class Pipeline(Estimator, MLReadable, MLWritable):`
      * `class PipelineModelMLWriter(JavaMLWriter):`
      * `class PipelineModel(Model, MLReadable, MLWritable):`
      * `class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class LinearRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class IsotonicRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class AFTSurvivalRegressionModel(JavaModel, JavaMLWritable, JavaMLReadable):`
      * `class MLWriter(object):`
      * `class JavaMLWriter(MLWriter):`
      * `class JavaMLWritable(MLWritable):`
      * `class MLReader(object):`
      * `class JavaMLReader(MLReader):`
      * `class JavaMLReadable(MLReadable):`
      * `  implicit class StringToColumn(val sc: StringContext) `
      * `class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] `
      * `        // the type in next() and we get a class cast exception.  If we make that function return`
      * `class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)`
      * `class StreamProgress(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56915182
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    --- End diff --
    
    One important implicit assumption which I will make explicit in a line comment: we assume that we'll always be able to get enough memory to unroll at least one element in between size calculation. This is the same assumption that we have in the deserialized case, since we only periodically measure memory usage there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201034418
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200931224
  
    Alright, just added a few more tests to `MemoryStoreSuite` to bump up the coverage of `putBytesAsIterator()` and fixed a problem leading to leaked unroll memory in `PartiallySerializedResult.finishWritingToStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200535298
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53957/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199438219
  
    **[Test build #53704 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53704/consoleFull)** for PR 11791 at commit [`a336c17`](https://github.com/apache/spark/commit/a336c177fec2cca5157619524c597032ef2212aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56914947
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    This is actually here on purpose and deserves a comment. The goal here is to make sure that once we reach line 317 we are guaranteed to have enough memory to store the block. When we finish serializing the block and reach line 311, it's possible that the actual memory usage has exceeded our unroll memory slightly, so here we do one final bumping up of the unroll memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198087062
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56875075
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -500,3 +601,79 @@ private[storage] class PartiallyUnrolledIterator(
         iter = null
       }
     }
    +
    +/**
    + * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
    + */
    +private class RedirectableOutputStream extends OutputStream {
    +  private[this] var os: OutputStream = _
    +  def setOutputStream(s: OutputStream): Unit = { os = s }
    +  override def write(b: Int): Unit = os.write(b)
    +  override def write(b: Array[Byte]): Unit = os.write(b)
    +  override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len)
    +  override def flush(): Unit = os.flush()
    +  override def close(): Unit = os.close()
    +}
    +
    +/**
    + * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
    + *
    + * @param memoryStore the MemoryStore, used for freeing memory.
    + * @param blockManager the BlockManager, used for deserializing values.
    + * @param blockId the block id.
    + * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
    + * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
    + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
    + * @param unrolled a byte buffer containing the partially-serialized values.
    + * @param rest         the rest of the original iterator passed to
    + *                     [[MemoryStore.putIteratorAsValues()]].
    + */
    +private[storage] class PartiallySerializedBlock(
    +    memoryStore: MemoryStore,
    +    blockManager: BlockManager,
    +    blockId: BlockId,
    +    serializationStream: SerializationStream,
    +    redirectableOutputStream: RedirectableOutputStream,
    +    unrollMemory: Long,
    +    unrolled: ChunkedByteBuffer,
    +    rest: Iterator[Any]) {
    +
    +  /**
    +   * Called to dispose of this block and free its memory.
    +   */
    +  def discard(): Unit = {
    +    try {
    +      serializationStream.close()
    +    } finally {
    +      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
    +    }
    +  }
    +
    +  /**
    +   * Finish writing this block to the given output stream by first writing the serialized values
    +   * and then serializing the values from the original input iterator.
    +   */
    +  def finishWritingToStream(os: OutputStream): Unit = {
    +    ByteStreams.copy(unrolled.toInputStream(), os)
    +    redirectableOutputStream.setOutputStream(os)
    +    while (rest.hasNext) {
    +      serializationStream.writeObject(rest.next())
    +    }
    +    serializationStream.close()
    +  }
    +
    +  /**
    +   * Returns an iterator over the values in this block by first deserializing the serialized
    +   * values and then consuming the rest of the original input iterator.
    +   *
    +   * If the caller does not plan to fully consume the resulting iterator then they must call
    +   * `close()` on it to free its resources.
    +   */
    +  def valuesIterator: PartiallyUnrolledIterator = {
    +    new PartiallyUnrolledIterator(
    +      memoryStore,
    +      unrollMemory,
    +      unrolled = blockManager.dataDeserialize(blockId, unrolled),
    --- End diff --
    
    It's kind of a bummer that we need to pass `blockManager` in here when all that we want to do is figure out which type of decompression and deserialization needs to be performed. It'd be nice to loosen coupling here in a future followup patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199424706
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53700/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201035834
  
    **[Test build #54094 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54094/consoleFull)** for PR 11791 at commit [`749df73`](https://github.com/apache/spark/commit/749df73c8dee8733a2617ff9ab8563fe3ed9de66).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201067234
  
    Looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201076459
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54094/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56874816
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -129,10 +136,9 @@ private[spark] class MemoryStore(
        *         iterator or call `close()` on it in order to free the storage memory consumed by the
        *         partially-unrolled block.
        */
    -  private[storage] def putIterator(
    +  private[storage] def putIteratorAsValues(
    --- End diff --
    
    In case it isn't obvious from the diff, the main change in this file is to split `putIterator` into two separate methods, `putIteratorAsValues` and `putIteratorAsBytes`.
    
    It's possible that there's some opportunity to reduce code duplication here, but unless we can come up with an obvious and simple approach I would prefer to defer cleanup to followup patches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199604436
  
    **[Test build #53741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53741/consoleFull)** for PR 11791 at commit [`cec1f02`](https://github.com/apache/spark/commit/cec1f02b36a0a52f977b800847d9a182b2958ff0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199498612
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53704/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199606300
  
    **[Test build #53742 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53742/consoleFull)** for PR 11791 at commit [`4976b74`](https://github.com/apache/spark/commit/4976b743c8ac8eda86b548ea6a858d9c6f164591).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198218345
  
    **[Test build #53496 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53496/consoleFull)** for PR 11791 at commit [`5489748`](https://github.com/apache/spark/commit/5489748d7e8b0d4aa7a0e7331a1a4a02f65d977f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200984924
  
    **[Test build #54070 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54070/consoleFull)** for PR 11791 at commit [`749df73`](https://github.com/apache/spark/commit/749df73c8dee8733a2617ff9ab8563fe3ed9de66).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199498303
  
    **[Test build #53704 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53704/consoleFull)** for PR 11791 at commit [`a336c17`](https://github.com/apache/spark/commit/a336c177fec2cca5157619524c597032ef2212aa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200980997
  
    **[Test build #54070 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54070/consoleFull)** for PR 11791 at commit [`749df73`](https://github.com/apache/spark/commit/749df73c8dee8733a2617ff9ab8563fe3ed9de66).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57391696
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    --- End diff --
    
    Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57394680
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -876,21 +876,40 @@ private[spark] class BlockManager(
           if (level.useMemory) {
             // Put it in memory first, even if it also has useDisk set to true;
             // We will drop it to disk later if the memory store can't hold it.
    -        memoryStore.putIterator(blockId, iterator(), level, classTag) match {
    -          case Right(s) =>
    -            size = s
    -          case Left(iter) =>
    -            // Not enough space to unroll this block; drop to disk if applicable
    -            if (level.useDisk) {
    -              logWarning(s"Persisting block $blockId to disk instead.")
    -              diskStore.put(blockId) { fileOutputStream =>
    -                serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
    +        if (level.deserialized) {
    +          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
    +            case Right(s) =>
    +              size = s
    +            case Left(iter) =>
    +              // Not enough space to unroll this block; drop to disk if applicable
    +              if (level.useDisk) {
    +                logWarning(s"Persisting block $blockId to disk instead.")
    +                diskStore.put(blockId) { fileOutputStream =>
    +                  serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
    +                }
    +                size = diskStore.getSize(blockId)
    +              } else {
    +                iteratorFromFailedMemoryStorePut = Some(iter)
                   }
    -              size = diskStore.getSize(blockId)
    -            } else {
    -              iteratorFromFailedMemoryStorePut = Some(iter)
    -            }
    +          }
    +        } else { // !level.deserialized
    +          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
    +            case Right(s) =>
    +              size = s
    +            case Left(partiallySerializedValues) =>
    +              // Not enough space to unroll this block; drop to disk if applicable
    +              if (level.useDisk) {
    +                logWarning(s"Persisting block $blockId to disk instead.")
    +                diskStore.put(blockId) { fileOutputStream =>
    +                  partiallySerializedValues.finishWritingToStream(fileOutputStream)
    +                }
    +                size = diskStore.getSize(blockId)
    +              } else {
    +                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
    +              }
    --- End diff --
    
    duplicate code >:(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200978545
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54057/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57395328
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -876,21 +876,40 @@ private[spark] class BlockManager(
           if (level.useMemory) {
             // Put it in memory first, even if it also has useDisk set to true;
             // We will drop it to disk later if the memory store can't hold it.
    -        memoryStore.putIterator(blockId, iterator(), level, classTag) match {
    -          case Right(s) =>
    -            size = s
    -          case Left(iter) =>
    -            // Not enough space to unroll this block; drop to disk if applicable
    -            if (level.useDisk) {
    -              logWarning(s"Persisting block $blockId to disk instead.")
    -              diskStore.put(blockId) { fileOutputStream =>
    -                serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
    +        if (level.deserialized) {
    +          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
    +            case Right(s) =>
    +              size = s
    +            case Left(iter) =>
    +              // Not enough space to unroll this block; drop to disk if applicable
    +              if (level.useDisk) {
    +                logWarning(s"Persisting block $blockId to disk instead.")
    +                diskStore.put(blockId) { fileOutputStream =>
    +                  serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
    +                }
    +                size = diskStore.getSize(blockId)
    +              } else {
    +                iteratorFromFailedMemoryStorePut = Some(iter)
                   }
    -              size = diskStore.getSize(blockId)
    -            } else {
    -              iteratorFromFailedMemoryStorePut = Some(iter)
    -            }
    +          }
    +        } else { // !level.deserialized
    +          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
    +            case Right(s) =>
    +              size = s
    +            case Left(partiallySerializedValues) =>
    +              // Not enough space to unroll this block; drop to disk if applicable
    +              if (level.useDisk) {
    +                logWarning(s"Persisting block $blockId to disk instead.")
    +                diskStore.put(blockId) { fileOutputStream =>
    +                  partiallySerializedValues.finishWritingToStream(fileOutputStream)
    +                }
    +                size = diskStore.getSize(blockId)
    +              } else {
    +                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
    +              }
    +          }
    --- End diff --
    
    about my previous comment about duplicate code, never mind. It can't actually be abstracted cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56916760
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -500,3 +601,79 @@ private[storage] class PartiallyUnrolledIterator(
         iter = null
       }
     }
    +
    +/**
    + * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
    + */
    +private class RedirectableOutputStream extends OutputStream {
    +  private[this] var os: OutputStream = _
    +  def setOutputStream(s: OutputStream): Unit = { os = s }
    +  override def write(b: Int): Unit = os.write(b)
    +  override def write(b: Array[Byte]): Unit = os.write(b)
    +  override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len)
    +  override def flush(): Unit = os.flush()
    +  override def close(): Unit = os.close()
    +}
    +
    +/**
    + * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
    + *
    + * @param memoryStore the MemoryStore, used for freeing memory.
    + * @param blockManager the BlockManager, used for deserializing values.
    + * @param blockId the block id.
    + * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
    + * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
    + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
    + * @param unrolled a byte buffer containing the partially-serialized values.
    + * @param rest         the rest of the original iterator passed to
    + *                     [[MemoryStore.putIteratorAsValues()]].
    + */
    +private[storage] class PartiallySerializedBlock(
    +    memoryStore: MemoryStore,
    +    blockManager: BlockManager,
    +    blockId: BlockId,
    +    serializationStream: SerializationStream,
    +    redirectableOutputStream: RedirectableOutputStream,
    +    unrollMemory: Long,
    +    unrolled: ChunkedByteBuffer,
    +    rest: Iterator[Any]) {
    +
    +  /**
    +   * Called to dispose of this block and free its memory.
    +   */
    +  def discard(): Unit = {
    --- End diff --
    
    who calls this now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199424660
  
    **[Test build #53700 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53700/consoleFull)** for PR 11791 at commit [`a336c17`](https://github.com/apache/spark/commit/a336c177fec2cca5157619524c597032ef2212aa).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56916037
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -1074,7 +1074,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     
         // Unroll with all the space in the world. This should succeed.
    -    var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
    +    var putResult = memoryStore.putIteratorAsValues("unroll", smallList.iterator)
    --- End diff --
    
    Do we need more test cases for putIteratorAsBytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57401730
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    +    if (keepUnrolling) {
    +      serializationStream.close()
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    I see...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198086746
  
    **[Test build #53456 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53456/consoleFull)** for PR 11791 at commit [`7dc3623`](https://github.com/apache/spark/commit/7dc362331f3f549670ecd9488db456b4136a3ad7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56917008
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -500,3 +601,79 @@ private[storage] class PartiallyUnrolledIterator(
         iter = null
       }
     }
    +
    +/**
    + * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
    + */
    +private class RedirectableOutputStream extends OutputStream {
    +  private[this] var os: OutputStream = _
    +  def setOutputStream(s: OutputStream): Unit = { os = s }
    +  override def write(b: Int): Unit = os.write(b)
    +  override def write(b: Array[Byte]): Unit = os.write(b)
    +  override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len)
    +  override def flush(): Unit = os.flush()
    +  override def close(): Unit = os.close()
    +}
    +
    +/**
    + * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
    + *
    + * @param memoryStore the MemoryStore, used for freeing memory.
    + * @param blockManager the BlockManager, used for deserializing values.
    + * @param blockId the block id.
    + * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
    + * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
    + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
    + * @param unrolled a byte buffer containing the partially-serialized values.
    + * @param rest         the rest of the original iterator passed to
    + *                     [[MemoryStore.putIteratorAsValues()]].
    + */
    +private[storage] class PartiallySerializedBlock(
    +    memoryStore: MemoryStore,
    +    blockManager: BlockManager,
    +    blockId: BlockId,
    +    serializationStream: SerializationStream,
    +    redirectableOutputStream: RedirectableOutputStream,
    +    unrollMemory: Long,
    +    unrolled: ChunkedByteBuffer,
    +    rest: Iterator[Any]) {
    +
    +  /**
    +   * Called to dispose of this block and free its memory.
    +   */
    +  def discard(): Unit = {
    --- End diff --
    
    It's not actually called in the current code, since it's only currently called in one place, `BlockManager.doPutIterator()`, which returns a `PartiallyUnrolledIterator`, so that sole callsite ends up calling `valuesIterator()`, which takes care of the discarding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200984957
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54070/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200535295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199642138
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198584688
  
    /cc @rxin @andrewor14, this is the next most important patch to review towards off-heap caching. After these changes get in, we'll be able to use off-heap memory for the unroll memory in off-heap caching, greatly simplifying things. Without this change, the on-heap unroll array needs to be accounted properly even if the final cache destination is off-heap, making the caching more OOM-prone and complicating the accounting logic (since it then becomes different between the two modes).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200984952
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57401356
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -544,3 +645,81 @@ private[storage] class PartiallyUnrolledIterator[T](
         iter = null
       }
     }
    +
    +/**
    + * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
    + */
    +private class RedirectableOutputStream extends OutputStream {
    +  private[this] var os: OutputStream = _
    +  def setOutputStream(s: OutputStream): Unit = { os = s }
    --- End diff --
    
    Do we need a whole new class for this? Can we just write (failed stream + the rest of input stream) to the file when we drop it to disk?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198648221
  
    Still WIP?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56875380
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -500,3 +601,79 @@ private[storage] class PartiallyUnrolledIterator(
         iter = null
       }
     }
    +
    +/**
    + * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
    + */
    +private class RedirectableOutputStream extends OutputStream {
    +  private[this] var os: OutputStream = _
    +  def setOutputStream(s: OutputStream): Unit = { os = s }
    +  override def write(b: Int): Unit = os.write(b)
    +  override def write(b: Array[Byte]): Unit = os.write(b)
    +  override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len)
    +  override def flush(): Unit = os.flush()
    +  override def close(): Unit = os.close()
    +}
    +
    +/**
    + * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
    + *
    + * @param memoryStore the MemoryStore, used for freeing memory.
    + * @param blockManager the BlockManager, used for deserializing values.
    + * @param blockId the block id.
    + * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
    + * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
    + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
    + * @param unrolled a byte buffer containing the partially-serialized values.
    + * @param rest         the rest of the original iterator passed to
    + *                     [[MemoryStore.putIteratorAsValues()]].
    + */
    +private[storage] class PartiallySerializedBlock(
    +    memoryStore: MemoryStore,
    +    blockManager: BlockManager,
    +    blockId: BlockId,
    +    serializationStream: SerializationStream,
    +    redirectableOutputStream: RedirectableOutputStream,
    +    unrollMemory: Long,
    +    unrolled: ChunkedByteBuffer,
    +    rest: Iterator[Any]) {
    +
    +  /**
    +   * Called to dispose of this block and free its memory.
    +   */
    +  def discard(): Unit = {
    +    try {
    +      serializationStream.close()
    +    } finally {
    +      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
    +    }
    +  }
    +
    +  /**
    +   * Finish writing this block to the given output stream by first writing the serialized values
    +   * and then serializing the values from the original input iterator.
    +   */
    +  def finishWritingToStream(os: OutputStream): Unit = {
    +    ByteStreams.copy(unrolled.toInputStream(), os)
    +    redirectableOutputStream.setOutputStream(os)
    +    while (rest.hasNext) {
    +      serializationStream.writeObject(rest.next())
    +    }
    +    serializationStream.close()
    +  }
    +
    +  /**
    +   * Returns an iterator over the values in this block by first deserializing the serialized
    +   * values and then consuming the rest of the original input iterator.
    +   *
    +   * If the caller does not plan to fully consume the resulting iterator then they must call
    +   * `close()` on it to free its resources.
    +   */
    +  def valuesIterator: PartiallyUnrolledIterator = {
    +    new PartiallyUnrolledIterator(
    +      memoryStore,
    +      unrollMemory,
    +      unrolled = blockManager.dataDeserialize(blockId, unrolled),
    --- End diff --
    
    Also, note that the changes in this patch are going to moderately conflict with my other patch for auto-selection of caching serializers, since we'll then need to make sure that the proper classTags are propagated here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200932092
  
    **[Test build #54057 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54057/consoleFull)** for PR 11791 at commit [`749df73`](https://github.com/apache/spark/commit/749df73c8dee8733a2617ff9ab8563fe3ed9de66).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199498607
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199577457
  
    cc @sameeragarwal 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56914534
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    --- End diff --
    
    Ah, I think this case is a mistake which might have been introduced while repairing a merge conflict. We should only increment this if `keepUnrolling == true`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198087068
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53456/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201076458
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199420135
  
    **[Test build #53700 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53700/consoleFull)** for PR 11791 at commit [`a336c17`](https://github.com/apache/spark/commit/a336c177fec2cca5157619524c597032ef2212aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199424702
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57399711
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    +    if (keepUnrolling) {
    +      serializationStream.close()
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    I don't get it. What change in functionality will there be if we moved those 2 lines into this if case? Did you separate them for readability?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199642644
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53741/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56914342
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    --- End diff --
    
    i dont understand why you add this twice in some cases


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57400901
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    +    if (keepUnrolling) {
    +      serializationStream.close()
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    After the `reserveAdditionalMemoryIfNecessary()` call on line 347, `keepUnrolling` can become `false`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57397179
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    +    if (keepUnrolling) {
    +      serializationStream.close()
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    See the comment above the first one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199642510
  
    **[Test build #53741 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53741/consoleFull)** for PR 11791 at commit [`cec1f02`](https://github.com/apache/spark/commit/cec1f02b36a0a52f977b800847d9a182b2958ff0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201085219
  
    Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57394875
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -876,21 +876,40 @@ private[spark] class BlockManager(
           if (level.useMemory) {
             // Put it in memory first, even if it also has useDisk set to true;
             // We will drop it to disk later if the memory store can't hold it.
    -        memoryStore.putIterator(blockId, iterator(), level, classTag) match {
    -          case Right(s) =>
    -            size = s
    -          case Left(iter) =>
    -            // Not enough space to unroll this block; drop to disk if applicable
    -            if (level.useDisk) {
    -              logWarning(s"Persisting block $blockId to disk instead.")
    -              diskStore.put(blockId) { fileOutputStream =>
    -                serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
    +        if (level.deserialized) {
    +          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
    +            case Right(s) =>
    +              size = s
    +            case Left(iter) =>
    +              // Not enough space to unroll this block; drop to disk if applicable
    +              if (level.useDisk) {
    +                logWarning(s"Persisting block $blockId to disk instead.")
    +                diskStore.put(blockId) { fileOutputStream =>
    +                  serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
    +                }
    +                size = diskStore.getSize(blockId)
    +              } else {
    +                iteratorFromFailedMemoryStorePut = Some(iter)
                   }
    -              size = diskStore.getSize(blockId)
    -            } else {
    -              iteratorFromFailedMemoryStorePut = Some(iter)
    -            }
    +          }
    +        } else { // !level.deserialized
    +          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
    +            case Right(s) =>
    +              size = s
    +            case Left(partiallySerializedValues) =>
    +              // Not enough space to unroll this block; drop to disk if applicable
    +              if (level.useDisk) {
    +                logWarning(s"Persisting block $blockId to disk instead.")
    +                diskStore.put(blockId) { fileOutputStream =>
    +                  partiallySerializedValues.finishWritingToStream(fileOutputStream)
    +                }
    +                size = diskStore.getSize(blockId)
    +              } else {
    +                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
    +              }
    --- End diff --
    
    You can clean it up easily here:
    ```
    val putResult: Either[...] =
      if (level.deserialized) {
        memoryStore.putIteratorAsValues(...)
      } else {
        memoryStore.putIteratorAsBytes(...)
      }
    putResult match {
      case Right(s) => s
      case Left(partiallySerializedValues) => ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200978222
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56914517
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    --- End diff --
    
    move this into line 317?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200978538
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198218448
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53496/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201035800
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56914928
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    +      serializationStream.close()
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    if (keepUnrolling) {
    +      val entry = SerializedMemoryEntry(
    +        new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)))
    +      // Synchronize so that transfer is atomic
    +      memoryManager.synchronized {
    --- End diff --
    
    I see this replicated in a few places (transferUnrollToStorage()). It might be easier to have releaseUnrollMemoryForThisTask take another argument to optionally transfer it to storage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57390963
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -279,13 +275,117 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = serializerManager.getSerializer(classTag).newInstance()
    +      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +      }
    +    }
    +
    +    // Unroll this block safely, checking whether we have exceeded our threshold
    +    while (values.hasNext && keepUnrolling) {
    +      serializationStream.writeObject(values.next())(classTag)
    +      reserveAdditionalMemoryIfNecessary()
    +    }
    +
    +    // Make sure that we have enough memory to store the block. By this point, it is possible that
    +    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    +    // perform one final call to attempt to allocate additional memory if necessary.
    --- End diff --
    
    This is because of the call to close? That can use more memory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199642642
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r57034760
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory. To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
    +   * whether there is enough free memory. If the block is successfully materialized, then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data. In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    +        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
    +        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
    +        if (keepUnrolling) {
    +          unrollMemoryUsedByThisBlock += amountToRequest
    +        }
    +        unrollMemoryUsedByThisBlock += amountToRequest
    --- End diff --
    
    This has now been fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199954971
  
    Doing a bit of refactoring now in order to make it easier to write proper unit tests for this. Therefore I'd hold off on the final review pass here for a little bit and review my SPARK-3000 patch instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198037121
  
    **[Test build #53456 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53456/consoleFull)** for PR 11791 at commit [`7dc3623`](https://github.com/apache/spark/commit/7dc362331f3f549670ecd9488db456b4136a3ad7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-200470257
  
    **[Test build #53957 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53957/consoleFull)** for PR 11791 at commit [`768a8d9`](https://github.com/apache/spark/commit/768a8d95ef394553ced64c1fad683127e54e699b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198185758
  
    **[Test build #53496 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53496/consoleFull)** for PR 11791 at commit [`5489748`](https://github.com/apache/spark/commit/5489748d7e8b0d4aa7a0e7331a1a4a02f65d977f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199642140
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53742/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199420351
  
    This is no longer WIP and should be ready for review now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-199436402
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980][WIP] Incrementally serialize blo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-198218447
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11791#issuecomment-201076288
  
    **[Test build #54094 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54094/consoleFull)** for PR 11791 at commit [`749df73`](https://github.com/apache/spark/commit/749df73c8dee8733a2617ff9ab8563fe3ed9de66).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org