You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by squito <gi...@git.apache.org> on 2018/05/29 17:01:49 UTC

[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

GitHub user squito opened a pull request:

    https://github.com/apache/spark/pull/21451

    [SPARK-24296][CORE][WIP] Replicate large blocks as a stream.

    When replicating large cached RDD blocks, it can be helpful to replicate
    them as a stream, to avoid using large amounts of memory during the
    transfer.  This also allows blocks larger than 2GB to be replicated.
    
    Added unit tests in DistributedSuite.  Also ran tests on a cluster for
    blocks > 2gb.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/squito/spark clean_replication

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21451
    
----
commit 05967808f5440835919f02d6c5d0d3563482d304
Author: Imran Rashid <ir...@...>
Date:   2018-05-02T14:55:15Z

    [SPARK-6237][NETWORK] Network-layer changes to allow stream upload.
    
    These changes allow an RPCHandler to receive an upload as a stream of
    data, without having to buffer the entire message in the FrameDecoder.
    The primary use case is for replicating large blocks.
    
    Added unit tests.

commit 43658df6d6b7dffacd528a2573e8846ab6469e81
Author: Imran Rashid <ir...@...>
Date:   2018-05-23T03:59:40Z

    [SPARK-24307][CORE] Support reading remote cached partitions > 2gb
    
    (1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
    ChunkedByteBuffer over the network, we use a custom version of
    FileRegion which is backed by the ChunkedByteBuffer.
    
    (2) On the receiving end, we need to expose all the data in a
    FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
    mapping the entire file in chunks.
    
    Added unit tests.  Also tested on a cluster with remote cache reads >
    2gb (in memory and on disk).

commit 7e517e4ea0ff66dc57121b54fdd71f8391edd8f2
Author: Imran Rashid <ir...@...>
Date:   2018-05-15T16:48:51Z

    [SPARK-24296][CORE] Replicate large blocks as a stream.
    
    When replicating large cached RDD blocks, it can be helpful to replicate
    them as a stream, to avoid using large amounts of memory during the
    transfer.  This also allows blocks larger than 2GB to be replicated.
    
    Added unit tests in DistributedSuite.  Also ran tests on a cluster for
    blocks > 2gb.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #93250 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93250/testReport)** for PR 21451 at commit [`335e26d`](https://github.com/apache/spark/commit/335e26d168dc99e7317175da8732ff691ff512f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r191628993
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    +   * If stream data is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    +   *                   otherwise it is null.
        * @param callback Callback which should be invoked exactly once upon success or failure of the
        *                 RPC.
        */
       public abstract void receive(
           TransportClient client,
           ByteBuffer message,
    +      StreamData streamData,
    --- End diff --
    
    It's not necessary to add a parameter.  Change the message parameter to InputStream.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94778 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94778/testReport)** for PR 21451 at commit [`44149a5`](https://github.com/apache/spark/commit/44149a51fd817324488508ba006da2d3272db490).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r208682245
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
    --- End diff --
    
    we need to honor spark.io.encryption.enabled here to encrypt the file on local disk?
    
    
     



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207310923
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        // Read the contents of the downloaded file as a buffer to put into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO Even if we're only going to write the data to disk after this, we end up using a lot
    +        // of memory here.  We wont' get a jvm OOM, but might get killed by the OS / cluster
    --- End diff --
    
    spelling won't


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94695 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94695/testReport)** for PR 21451 at commit [`034acb4`](https://github.com/apache/spark/commit/034acb40036b50e459b72e6a4b4156dc33244ae9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #91271 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91271/testReport)** for PR 21451 at commit [`68c5d5f`](https://github.com/apache/spark/commit/68c5d5f5f60da7cbc0ce356acd8e5ab31db70ea5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93250/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91271/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #92398 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92398/testReport)** for PR 21451 at commit [`1cc0f3f`](https://github.com/apache/spark/commit/1cc0f3ffa2b563c54771a38c4dd9f2598b29f0db).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1874/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94319 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94319/testReport)** for PR 21451 at commit [`6d059f2`](https://github.com/apache/spark/commit/6d059f25f3595243a8dd6195a5ee938a78e40d99).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    ```
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, localhost, executor 1): java.io.IOException: org.apache.spark.SparkException: corrupt remote block broadcast_0_piece0 of broadcast_0: 1651574976 != 1165629262
    	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1320)
    	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
    	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
    	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
    	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
    	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)
    	at org.apache.spark.scheduler.Task.run(Task.scala:121)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:367)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1347)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:373)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.spark.SparkException: corrupt remote block broadcast_0_piece0 of broadcast_0: 1651574976 != 1165629262
    	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:167)
    	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:151)
    	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:151)
    	at scala.collection.immutable.List.foreach(List.scala:392)
    	at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:151)
    	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:231)
    	at scala.Option.getOrElse(Option.scala:121)
    	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:211)
    	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1313)
    	... 13 more
    ```
    
    Is this possible a bug introduced by this PR? After merging this PR, I saw this error multiple times. https://issues.apache.org/jira/browse/SPARK-25422
    
    cc @squito  @cloud-fan @vanzin @tgravescs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94762 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94762/testReport)** for PR 21451 at commit [`c45e702`](https://github.com/apache/spark/commit/c45e702e43a5982010666bde08a11c1010b10099).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #91268 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91268/testReport)** for PR 21451 at commit [`6ca6f8d`](https://github.com/apache/spark/commit/6ca6f8dc0dd7962194fc53e6cc9945a2f38e20dc).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class UploadBlockStream extends BlockTransferMessage `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #93255 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93255/testReport)** for PR 21451 at commit [`335e26d`](https://github.com/apache/spark/commit/335e26d168dc99e7317175da8732ff691ff512f2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class UploadBlockStream extends BlockTransferMessage `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94725/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #91271 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91271/testReport)** for PR 21451 at commit [`68c5d5f`](https://github.com/apache/spark/commit/68c5d5f5f60da7cbc0ce356acd8e5ab31db70ea5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r208677974
  
    --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.shuffle.protocol;
    +
    +import java.util.Arrays;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.protocol.Encoders;
    +
    +// Needed by ScalaDoc. See SPARK-7726
    +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
    +
    +/**
    + * A request to Upload a block, which the destintation should receive as a stream.
    --- End diff --
    
    nit: spelling destination


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207309822
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---
    @@ -73,10 +73,32 @@ class NettyBlockRpcServer(
             }
             val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
             val blockId = BlockId(uploadBlock.blockId)
    +        logInfo(s"Receiving replicated block $blockId with level ${level} " +
    +          s"from ${client.getSocketAddress}")
             blockManager.putBlockData(blockId, data, level, classTag)
             responseContext.onSuccess(ByteBuffer.allocate(0))
         }
       }
     
    +  override def receiveStream(
    +      client: TransportClient,
    +      messageHeader: ByteBuffer,
    +    responseContext: RpcResponseCallback): StreamCallbackWithID = {
    --- End diff --
    
    fix spacing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    fyi, I did finally run my scale tests again on a cluster, and shuffles, remote reads, and replication worked for blocks over 2gb (sorry got sidetracked with a few other things in the meantime)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #91266 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91266/testReport)** for PR 21451 at commit [`7e517e4`](https://github.com/apache/spark/commit/7e517e4ea0ff66dc57121b54fdd71f8391edd8f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    still looking -- will put comments on the jira so its more visible


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r198656910
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -723,7 +770,9 @@ private[spark] class BlockManager(
           }
     
           if (data != null) {
    -        return Some(new ChunkedByteBuffer(data))
    +        val chunkSize =
    +          conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
    --- End diff --
    
    Want to turn this into a config constant? I'm seeing it in a bunch of places.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r198652991
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockDataManager.scala ---
    @@ -43,6 +44,17 @@ trait BlockDataManager {
           level: StorageLevel,
           classTag: ClassTag[_]): Boolean
     
    +  /**
    +   * Put the given block that will be received as a stream.
    +   *
    +   * When this method is called, the data itself is not available -- it needs to be handled within
    +   * the callbacks of <code>streamData</code>.
    --- End diff --
    
    Need to update comment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r209042028
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
    --- End diff --
    
    yeah sure looks like it.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91268/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94778/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92407/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    @mridulm @jerryshao @felixcheung last one in the 2GB block limit series.  just rebased to include the updates to https://github.com/apache/spark/pull/21440.  I will also run my tests on a cluster here with this: https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala
    will report the results from that, probably tomorrow
    
    thanks for all the reviews!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r210153952
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -406,6 +407,61 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    val channel = new CountingWritableChannel(
    +      Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile))))
    +    logTrace(s"Streaming block $blockId to tmp file $tmpFile")
    +    new StreamCallbackWithID {
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        logTrace(s"Done receiving block $blockId, now putting into local blockManager")
    +        // Read the contents of the downloaded file as a buffer to put into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
    +        // using a lot of memory here.  We won't get a jvm OOM, but might get killed by the
    +        // OS / cluster manager.  We could at least read the tmp file as a stream.
    +        val buffer = securityManager.getIOEncryptionKey() match {
    +          case Some(key) =>
    +            // we need to pass in the size of the unencrypted block
    +            val blockSize = channel.getCount
    +            val allocator = level.memoryMode match {
    +              case MemoryMode.ON_HEAP => ByteBuffer.allocate _
    +              case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
    +            }
    +            new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
    --- End diff --
    
    yeah, you store the entire file in memory (after decrypting).  its not memory mapped either, so it'll probably be a regular OOM (depending on memory mode).  updated the comment


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1105/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207968882
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        // Read the contents of the downloaded file as a buffer to put into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO Even if we're only going to write the data to disk after this, we end up using a lot
    +        // of memory here.  We wont' get a jvm OOM, but might get killed by the OS / cluster
    --- End diff --
    
    filed SPARK-25035


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94778 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94778/testReport)** for PR 21451 at commit [`44149a5`](https://github.com/apache/spark/commit/44149a51fd817324488508ba006da2d3272db490).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3678/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94695 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94695/testReport)** for PR 21451 at commit [`034acb4`](https://github.com/apache/spark/commit/034acb40036b50e459b72e6a4b4156dc33244ae9).
     * This patch **fails from timeout after a configured wait of \`300m\`**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2186/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r210097183
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -406,6 +407,61 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    val channel = new CountingWritableChannel(
    +      Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile))))
    +    logTrace(s"Streaming block $blockId to tmp file $tmpFile")
    +    new StreamCallbackWithID {
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        logTrace(s"Done receiving block $blockId, now putting into local blockManager")
    +        // Read the contents of the downloaded file as a buffer to put into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
    +        // using a lot of memory here.  We won't get a jvm OOM, but might get killed by the
    +        // OS / cluster manager.  We could at least read the tmp file as a stream.
    +        val buffer = securityManager.getIOEncryptionKey() match {
    +          case Some(key) =>
    +            // we need to pass in the size of the unencrypted block
    +            val blockSize = channel.getCount
    +            val allocator = level.memoryMode match {
    +              case MemoryMode.ON_HEAP => ByteBuffer.allocate _
    +              case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
    +            }
    +            new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
    --- End diff --
    
    `toChunkedByteBuffer` is also pretty memory-hungry, right? You'll end up needing enough memory to hold the entire file in memory, if I read the code right.
    
    This is probably ok for now, but should probably mention it in your TODO above.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94695/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94762 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94762/testReport)** for PR 21451 at commit [`c45e702`](https://github.com/apache/spark/commit/c45e702e43a5982010666bde08a11c1010b10099).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2164/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94319 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94319/testReport)** for PR 21451 at commit [`6d059f2`](https://github.com/apache/spark/commit/6d059f25f3595243a8dd6195a5ee938a78e40d99).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r192279111
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    +   * If stream data is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    +   *                   otherwise it is null.
        * @param callback Callback which should be invoked exactly once upon success or failure of the
        *                 RPC.
        */
       public abstract void receive(
           TransportClient client,
           ByteBuffer message,
    +      StreamData streamData,
    --- End diff --
    
    What about incorporating parameter `message` into parameter `streamData`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94725 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94725/testReport)** for PR 21451 at commit [`c45e702`](https://github.com/apache/spark/commit/c45e702e43a5982010666bde08a11c1010b10099).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1108/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207309499
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---
    @@ -73,10 +73,32 @@ class NettyBlockRpcServer(
             }
             val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
             val blockId = BlockId(uploadBlock.blockId)
    +        logInfo(s"Receiving replicated block $blockId with level ${level} " +
    --- End diff --
    
    this seems like it could be pretty verbose, put at debug or trace?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92398/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r198652931
  
    --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java ---
    @@ -0,0 +1,89 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.shuffle.protocol;
    +
    +import java.util.Arrays;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.protocol.Encoders;
    +
    +// Needed by ScalaDoc. See SPARK-7726
    +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
    +
    +/**
    + * A request to Upload a block, which the destintation should receive as a stream.
    + *
    + * The actual block data is not contained here.  It is in the streamData in the RpcHandler.receive()
    --- End diff --
    
    Need to update to match API.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #94725 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94725/testReport)** for PR 21451 at commit [`c45e702`](https://github.com/apache/spark/commit/c45e702e43a5982010666bde08a11c1010b10099).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/531/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #92398 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92398/testReport)** for PR 21451 at commit [`1cc0f3f`](https://github.com/apache/spark/commit/1cc0f3ffa2b563c54771a38c4dd9f2598b29f0db).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class UploadBlockStream extends BlockTransferMessage `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r210095275
  
    --- Diff: core/src/test/scala/org/apache/spark/security/EncryptionFunSuite.scala ---
    @@ -28,11 +28,15 @@ trait EncryptionFunSuite {
        * for the test to modify the provided SparkConf.
        */
       final protected def encryptionTest(name: String)(fn: SparkConf => Unit) {
    +    encryptionTestHelper(name) { case (name, conf) =>
    +        test(name)(fn(conf))
    --- End diff --
    
    nit: indentation


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    LGTM. Merging to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2141/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91266/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #91266 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91266/testReport)** for PR 21451 at commit [`7e517e4`](https://github.com/apache/spark/commit/7e517e4ea0ff66dc57121b54fdd71f8391edd8f2).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class UploadBlockStream extends BlockTransferMessage `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test/job/spark-branch-2.4-test-maven-hadoop-2.7/21/


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2197/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    This is the change for SPARK-24296, on top of https://github.com/apache/spark/pull/21346 and https://github.com/apache/spark/pull/21440
    
    Posting here for testing.  Review are welcome on this commit which has just the relevant changes: https://github.com/apache/spark/pull/21451/commits/7e517e4ea0ff66dc57121b54fdd71f8391edd8f2


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #93250 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93250/testReport)** for PR 21451 at commit [`335e26d`](https://github.com/apache/spark/commit/335e26d168dc99e7317175da8732ff691ff512f2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class UploadBlockStream extends BlockTransferMessage `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #92427 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92427/testReport)** for PR 21451 at commit [`bdfa6ff`](https://github.com/apache/spark/commit/bdfa6ff047f3f92325b628498c45fc2467b64b3b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r198657121
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1341,12 +1390,16 @@ private[spark] class BlockManager(
           try {
             val onePeerStartTime = System.nanoTime
             logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        // This thread keeps a lock on the block, so we do not want the netty thread to unlock
    +        // block when it finishes sending the message.
    +        val mb = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false,
    --- End diff --
    
    s/mb/buffer
    
    Confusing in a place that deals with sizes all over.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94319/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94762/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r192136490
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    +   * If stream data is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    +   *                   otherwise it is null.
        * @param callback Callback which should be invoked exactly once upon success or failure of the
        *                 RPC.
        */
       public abstract void receive(
           TransportClient client,
           ByteBuffer message,
    +      StreamData streamData,
    --- End diff --
    
    yes, there are other ways to do this, but I wanted to leave the old code paths as close relatively untouched to minimize the behavior change / risk of bugs.  I also think its helpful to clearly separate out a portion that is read entirely into memory vs. the streaming portion, it makes it easier to work with.  Also InputStream suggests the data is getting pulled instead of pushed.
    
    your earlier approach definitely gave a lot of inspiration for this change.  I'm hoping that making it a more isolated change helps us make progress here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207309551
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -567,4 +567,10 @@ package object config {
           .intConf
           .checkValue(v => v > 0, "The value should be a positive integer.")
           .createWithDefault(2000)
    +
    +  private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS =
    +    ConfigBuilder("spark.storage.memoryMapLimitForTests")
    +      .internal()
    --- End diff --
    
    add a .doc that says is for testing only


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207316984
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
    +
    +      override def getID: String = blockId.name
    +
    +      override def onData(streamId: String, buf: ByteBuffer): Unit = {
    +        while (buf.hasRemaining) {
    +          channel.write(buf)
    +        }
    +      }
    +
    +      override def onComplete(streamId: String): Unit = {
    +        // Read the contents of the downloaded file as a buffer to put into the blockManager.
    +        // Note this is all happening inside the netty thread as soon as it reads the end of the
    +        // stream.
    +        channel.close()
    +        // TODO Even if we're only going to write the data to disk after this, we end up using a lot
    +        // of memory here.  We wont' get a jvm OOM, but might get killed by the OS / cluster
    --- End diff --
    
    yeah agree this could be an issue with yarn since overhead memory might not be big enough, can we file a jira to specifically track this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #92407 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92407/testReport)** for PR 21451 at commit [`fa1928a`](https://github.com/apache/spark/commit/fa1928aa48655ca2fb036759260cfa71324ed37c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93255/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r198656665
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -659,6 +701,11 @@ private[spark] class BlockManager(
        * Get block from remote block managers as serialized bytes.
        */
       def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    +    // TODO if we change this method to return the ManagedBuffer, then getRemoteValues
    +    // could just use the inputStream on the temp file, rather than memory-mapping the file.
    +    // Until then, replication can cause the process to use too much memory and get killed
    +    // by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though
    --- End diff --
    
    it's


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21451


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/546/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #92407 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92407/testReport)** for PR 21451 at commit [`fa1928a`](https://github.com/apache/spark/commit/fa1928aa48655ca2fb036759260cfa71324ed37c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    looking.   so far seems unrelated to me, but as you've said its failed in a few builds so I'm gonna keep digging.  The error is occurring before any rdds are getting replicated via the new code path.  This change is mostly not touching the path involved in sending a broadcast.
    
    I've been unable to repro so far despite running the test hundreds of times, but I might need to run more tests or put in some pauses or something.  gonna compare with other test runs with teh failure as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1315/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #93549 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93549/testReport)** for PR 21451 at commit [`fe31a7d`](https://github.com/apache/spark/commit/fe31a7d61ecabca76356b313211bb7b769e02b5b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r209000299
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -404,6 +405,47 @@ private[spark] class BlockManager(
         putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag)
       }
     
    +  override def putBlockDataAsStream(
    +      blockId: BlockId,
    +      level: StorageLevel,
    +      classTag: ClassTag[_]): StreamCallbackWithID = {
    +    // TODO if we're going to only put the data in the disk store, we should just write it directly
    +    // to the final location, but that would require a deeper refactor of this code.  So instead
    +    // we just write to a temp file, and call putBytes on the data in that file.
    +    val tmpFile = diskBlockManager.createTempLocalBlock()._2
    +    new StreamCallbackWithID {
    +      val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
    --- End diff --
    
    yikes, good point!  this also goes for the existing fetch-to-disk code, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r192796600
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    +   * If stream data is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    +   *                   otherwise it is null.
        * @param callback Callback which should be invoked exactly once upon success or failure of the
        *                 RPC.
        */
       public abstract void receive(
           TransportClient client,
           ByteBuffer message,
    +      StreamData streamData,
    --- End diff --
    
    I'm gonna move discussion here https://github.com/apache/spark/pull/21346 since that is the PR that will introduce this api


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #92427 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92427/testReport)** for PR 21451 at commit [`bdfa6ff`](https://github.com/apache/spark/commit/bdfa6ff047f3f92325b628498c45fc2467b64b3b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #93549 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93549/testReport)** for PR 21451 at commit [`fe31a7d`](https://github.com/apache/spark/commit/fe31a7d61ecabca76356b313211bb7b769e02b5b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class UploadBlockStream extends BlockTransferMessage `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #93255 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93255/testReport)** for PR 21451 at commit [`335e26d`](https://github.com/apache/spark/commit/335e26d168dc99e7317175da8732ff691ff512f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    @squito Thanks for digging it! 
    
    This PR introduced the failed test case. We have to know whether it exposes any serious bug (if it is not introduced by this PR) and impacts our 2.4 release. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE] Replicate large blocks as a s...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r207310135
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---
    @@ -73,10 +73,32 @@ class NettyBlockRpcServer(
             }
             val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
             val blockId = BlockId(uploadBlock.blockId)
    +        logInfo(s"Receiving replicated block $blockId with level ${level} " +
    +          s"from ${client.getSocketAddress}")
             blockManager.putBlockData(blockId, data, level, classTag)
             responseContext.onSuccess(ByteBuffer.allocate(0))
         }
       }
     
    +  override def receiveStream(
    +      client: TransportClient,
    +      messageHeader: ByteBuffer,
    +    responseContext: RpcResponseCallback): StreamCallbackWithID = {
    +    val message =
    +      BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]
    +    val (level: StorageLevel, classTag: ClassTag[_]) = {
    +      serializer
    +        .newInstance()
    +        .deserialize(ByteBuffer.wrap(message.metadata))
    +        .asInstanceOf[(StorageLevel, ClassTag[_])]
    +    }
    +    val blockId = BlockId(message.blockId)
    +    logInfo(s"Receiving replicated block $blockId with level ${level} as stream " +
    --- End diff --
    
    debug?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92427/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    @tgravescs @vanzin any more comments?  I think I've addressed everything


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/524/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    **[Test build #91268 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91268/testReport)** for PR 21451 at commit [`6ca6f8d`](https://github.com/apache/spark/commit/6ca6f8dc0dd7962194fc53e6cc9945a2f38e20dc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21451#discussion_r198653427
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---
    @@ -73,10 +73,34 @@ class NettyBlockRpcServer(
             }
             val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
             val blockId = BlockId(uploadBlock.blockId)
    +        logInfo(s"Receiving replicated block $blockId with level ${level} " +
    +          s"from ${client.getSocketAddress}")
             blockManager.putBlockData(blockId, data, level, classTag)
             responseContext.onSuccess(ByteBuffer.allocate(0))
         }
       }
     
    +  override def receiveStream(
    +      client: TransportClient,
    +      messageHeader: ByteBuffer,
    +      responseContext: RpcResponseCallback): StreamCallbackWithID = {
    +    val message = BlockTransferMessage.Decoder.fromByteBuffer(messageHeader)
    +    message match {
    +      case uploadBlockStream: UploadBlockStream =>
    +       val (level: StorageLevel, classTag: ClassTag[_]) = {
    --- End diff --
    
    Indentation is off here.
    
    Using `.asInstanceOf[UploadBlockStream]` would achieve the same goal here with less indentation, just with a different exception...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3677/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3680/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE] Replicate large blocks as a stream.

Posted by rezasafi <gi...@git.apache.org>.
Github user rezasafi commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    LGTM good to me (after applying the @tgravescs comments). Great job on the whole issue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21451: [SPARK-24296][CORE][WIP] Replicate large blocks as a str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21451
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93549/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org