You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by squito <gi...@git.apache.org> on 2018/05/16 18:39:57 UTC

[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

GitHub user squito opened a pull request:

    https://github.com/apache/spark/pull/21346

    [SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

    These changes allow an RPCHandler to receive an upload as a stream of
    data, without having to buffer the entire message in the FrameDecoder.
    The primary use case is for replicating large blocks.
    
    Added unit tests for handling streaming data, including successfully sending data, and failures in reading the stream with concurrent requests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/squito/spark upload_stream

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21346
    
----
commit 49e0a80f89433368d3a3116eb9fcd7854ceecb62
Author: Imran Rashid <ir...@...>
Date:   2018-05-02T14:55:15Z

    [SPARK-6237][NETWORK] Network-layer changes to allow stream upload.
    
    These changes allow an RPCHandler to receive an upload as a stream of
    data, without having to buffer the entire message in the FrameDecoder.
    The primary use case is for replicating large blocks.
    
    Added unit tests.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191003869
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -36,6 +36,9 @@ object MimaExcludes {
     
       // Exclude rules for 2.4.x
       lazy val v24excludes = v23excludes ++ Seq(
    +    // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
    +    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
    --- End diff --
    
    Kinda wondering why this class is public in the first place... along with `SparkTransportConf` in the same package.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191003553
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network;
    +
    +import com.google.common.io.Files;
    +import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
    --- End diff --
    
    Wrong place... basically in every file you've changed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #92349 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92349/testReport)** for PR 21346 at commit [`cd11abc`](https://github.com/apache/spark/commit/cd11abc3261d6f37731aa4574705119e0ac57a93).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91925/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91195 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91195/testReport)** for PR 21346 at commit [`7bd1b43`](https://github.com/apache/spark/commit/7bd1b43c81a3cdd7b88cf64994cfe8f2b3c5fdf8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91195/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91791/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91854 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91854/testReport)** for PR 21346 at commit [`1a222aa`](https://github.com/apache/spark/commit/1a222aa77d2a31fd3b3ffe21edfc69ab99e80806).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91121 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91121/testReport)** for PR 21346 at commit [`2fef75f`](https://github.com/apache/spark/commit/2fef75f18a115db542afe96d49b8cbe9ed534d53).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91192 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91192/testReport)** for PR 21346 at commit [`f4d9123`](https://github.com/apache/spark/commit/f4d9123be67ee2421436af741289134013a9760f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92349/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191981552
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    --- End diff --
    
    pretty good question actually :)
    
    I will take a closer look at this myself but I believe this connection is shared by other tasks running on the same executor which are trying to talk to the same destination.  So that might mean another task which is replicating to the same destination, or reading data from that same remote executor.  those don't have specific retry behavior for connection closed -- that might result in the data just not getting replicated, fetching data from elsewhere, or the task getting retried.
    
    I think this is actually OK -- the existing code could cause an OOM on the remote end anyway, which obviously would fail a lot more.   This failure behavior seems reasonable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91324 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91324/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3996/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91136 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91136/testReport)** for PR 21346 at commit [`32f4f94`](https://github.com/apache/spark/commit/32f4f94e3cde50015a8ea478969636fca708cf82).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91793 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91793/testReport)** for PR 21346 at commit [`8a18da5`](https://github.com/apache/spark/commit/8a18da511a8053fc4dcf6529f49333d71bd6277d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #90694 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90694/testReport)** for PR 21346 at commit [`fa3ac4e`](https://github.com/apache/spark/commit/fa3ac4e8b0c80964e93e9e777106f97324308293).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4188 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4188/testReport)** for PR 21346 at commit [`3098b9c`](https://github.com/apache/spark/commit/3098b9cd9ffc29517b446bb660fe5be9f0031cc1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4194 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4194/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191059478
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network;
    +
    +import com.google.common.io.Files;
    +import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
    --- End diff --
    
    ooops, sorry got used to the style checker warning finding these in scala.  fixed these now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91138/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195819256
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream : streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, sem));
    +    }
    +
    +    if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
    +      fail("Timeout getting response from the server");
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    +      try {
    +        streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
    +      } catch (IOException e) {
    --- End diff --
    
    Method throws `Exception`, so this seems unnecessary.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195796301
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    --- End diff --
    
    perhaps, but do you think that is really that useful?  the handling of them is different (both in the network layer and the outer RpcHandler).  And other things being equal, I'm biased to fewer changes to existing code paths.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #92347 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92347/testReport)** for PR 21346 at commit [`58d52b9`](https://github.com/apache/spark/commit/58d52b970b95b1a0bdbb6829371615cbbcf3e936).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195464933
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,28 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  An error while reading data from
    +   * the stream ({@link org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
    +   * will fail the entire channel.  A failure in "post-processing" the stream in
    +   * {@link org.apache.spark.network.client.StreamCallback#onComplete(String)} will result in an
    +   * rpcFailure, but the channel will remain active.
    +   *
    +   * If streamData is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    --- End diff --
    
    I've done this refactoring, and I agree it made the change significantly simpler.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91122 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91122/testReport)** for PR 21346 at commit [`54533c2`](https://github.com/apache/spark/commit/54533c2c882a399d5708da8dfe6a518dd6132844).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191001160
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -17,17 +17,21 @@
     
     package org.apache.spark.network;
     
    +import java.io.*;
     import java.nio.ByteBuffer;
    -import java.util.ArrayList;
    -import java.util.Collections;
    -import java.util.HashSet;
    -import java.util.Iterator;
    -import java.util.List;
    -import java.util.Set;
    +import java.util.*;
    +import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.Semaphore;
     import java.util.concurrent.TimeUnit;
     
     import com.google.common.collect.Sets;
    +import com.google.common.io.Files;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +import org.apache.commons.lang3.tuple.Pair;
    +import org.apache.spark.network.buffer.ManagedBuffer;
    --- End diff --
    
    Wrong place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r190999775
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---
    @@ -23,25 +23,16 @@
     import com.google.common.base.Throwables;
     import io.netty.channel.Channel;
     import io.netty.channel.ChannelFuture;
    +import org.apache.spark.network.protocol.*;
    --- End diff --
    
    These are in the wrong place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191935821
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java ---
    @@ -50,16 +52,22 @@
     
       @Override
       public void exceptionCaught(Throwable cause) throws Exception {
    -    handler.deactivateStream();
    +    deactivateStream();
         callback.onFailure(streamId, cause);
       }
     
       @Override
       public void channelInactive() throws Exception {
    -    handler.deactivateStream();
    +    deactivateStream();
         callback.onFailure(streamId, new ClosedChannelException());
       }
     
    +  private void deactivateStream() {
    +    if (handler instanceof TransportResponseHandler) {
    --- End diff --
    
    Why don't we need to do this for `TransportRequestHandler`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91122 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91122/testReport)** for PR 21346 at commit [`54533c2`](https://github.com/apache/spark/commit/54533c2c882a399d5708da8dfe6a518dd6132844).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91138 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91138/testReport)** for PR 21346 at commit [`331124b`](https://github.com/apache/spark/commit/331124b125db6b59009e12249542f667a227226e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91056/testReport)** for PR 21346 at commit [`3098b9c`](https://github.com/apache/spark/commit/3098b9cd9ffc29517b446bb660fe5be9f0031cc1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91136 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91136/testReport)** for PR 21346 at commit [`32f4f94`](https://github.com/apache/spark/commit/32f4f94e3cde50015a8ea478969636fca708cf82).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #92101 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92101/testReport)** for PR 21346 at commit [`fd62f61`](https://github.com/apache/spark/commit/fd62f615369e287d6deb707d6b0bfa11cfead2fe).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191001378
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -85,10 +96,52 @@ public void receive(TransportClient client, ByteBuffer message) {
         oneWayMsgs = new ArrayList<>();
       }
     
    +  private static void receiveStream(String msg, StreamData streamData) {
    +    try {
    +      if (msg.startsWith("fail/")) {
    +        String[] parts = msg.split("/");
    +        switch(parts[1]) {
    --- End diff --
    
    space before `(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191964329
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -36,6 +36,9 @@ object MimaExcludes {
     
       // Exclude rules for 2.4.x
       lazy val v24excludes = v23excludes ++ Seq(
    +    // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
    +    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
    --- End diff --
    
    I suspect that it's because we might want to access these across Java package boundaries and Java doesn't have the equivalent of Scala's nested package scoped `private[package]`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195592158
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.Random;
    +
    +import com.google.common.io.Files;
    +
    +import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NioManagedBuffer;
    +import org.apache.spark.network.util.TransportConf;
    +
    +class StreamTestHelper {
    +  static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
    +
    +  final File testFile;
    +  File tempDir;
    --- End diff --
    
    `final` for all these?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r190997078
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -244,6 +242,54 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
         return requestId;
       }
     
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    +    if (logger.isTraceEnabled()) {
    +      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    +    }
    +
    +    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    +    handler.addRpcRequest(requestId, callback);
    +
    +    channel.writeAndFlush(new UploadStream(requestId, meta, data))
    +        .addListener(future -> {
    +          if (future.isSuccess()) {
    --- End diff --
    
    First reaction is that it's about the right time to refactor this into a helper method... all instances in this class look quite similar.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/106/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195586580
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
           logger.trace("Sending RPC to {}", getRemoteAddress(channel));
         }
     
    -    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    +    long requestId = requestId();
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
     
         return requestId;
       }
     
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    --- End diff --
    
    Seems like it should be easy to move this to `StdChannelListener`'s constructor. Looks pretty similar in all methods.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191978140
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java ---
    @@ -50,16 +52,22 @@
     
       @Override
       public void exceptionCaught(Throwable cause) throws Exception {
    -    handler.deactivateStream();
    +    deactivateStream();
         callback.onFailure(streamId, cause);
       }
     
       @Override
       public void channelInactive() throws Exception {
    -    handler.deactivateStream();
    +    deactivateStream();
         callback.onFailure(streamId, new ClosedChannelException());
       }
     
    +  private void deactivateStream() {
    +    if (handler instanceof TransportResponseHandler) {
    --- End diff --
    
    the only purpose of `TransportResponseHandler.deactivateStream()` is to include the stream request in the count for `numOutstandingRequests` (its not doing any critical cleanup).  I will include a comment here explaining that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91192 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91192/testReport)** for PR 21346 at commit [`f4d9123`](https://github.com/apache/spark/commit/f4d9123be67ee2421436af741289134013a9760f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class RpcChannelListener extends StdChannelListener `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #92349 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92349/testReport)** for PR 21346 at commit [`cd11abc`](https://github.com/apache/spark/commit/cd11abc3261d6f37731aa4574705119e0ac57a93).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195592205
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.Random;
    +
    +import com.google.common.io.Files;
    +
    +import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NioManagedBuffer;
    +import org.apache.spark.network.util.TransportConf;
    +
    +class StreamTestHelper {
    +  static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
    +
    +  final File testFile;
    +  File tempDir;
    +
    +  ByteBuffer emptyBuffer;
    +  ByteBuffer smallBuffer;
    +  ByteBuffer largeBuffer;
    +
    +  private static ByteBuffer createBuffer(int bufSize) {
    +    ByteBuffer buf = ByteBuffer.allocate(bufSize);
    +    for (int i = 0; i < bufSize; i ++) {
    +      buf.put((byte) i);
    +    }
    +    buf.flip();
    +    return buf;
    +  }
    +
    +  StreamTestHelper() throws Exception {
    +    tempDir = Files.createTempDir();
    +    emptyBuffer = createBuffer(0);
    +    smallBuffer = createBuffer(100);
    +    largeBuffer = createBuffer(100000);
    +
    +    testFile = File.createTempFile("stream-test-file", "txt", tempDir);
    +    FileOutputStream fp = new FileOutputStream(testFile);
    +    try {
    +      Random rnd = new Random();
    +      for (int i = 0; i < 512; i++) {
    +        byte[] fileContent = new byte[1024];
    +        rnd.nextBytes(fileContent);
    +        fp.write(fileContent);
    +      }
    +    } finally {
    +      fp.close();
    +    }
    +  }
    +
    +  public ByteBuffer srcBuffer(String name) {
    +    switch (name) {
    +      case "largeBuffer":
    +        return largeBuffer;
    +      case "smallBuffer":
    +        return smallBuffer;
    +      case "emptyBuffer":
    +        return emptyBuffer;
    +      default:
    +        throw new IllegalArgumentException("Invalid stream: " + name);
    +    }
    +  }
    +
    +  public ManagedBuffer openStream(TransportConf conf, String streamId) {
    +    switch (streamId) {
    +      case "file":
    +        return new FileSegmentManagedBuffer(conf, testFile, 0, testFile.length());
    +      default:
    +        return new NioManagedBuffer(srcBuffer(streamId));
    +    }
    +  }
    +
    +
    --- End diff --
    
    nit: remove


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/329/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91324/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191979019
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    +  /** Used to link an RPC request with its response. */
    +  public final long requestId;
    +  public final ManagedBuffer meta;
    +  public final long bodyByteCount;
    +
    +  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
    +    super(body, false); // body is *not* included in the frame
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    bodyByteCount = body.size();
    +  }
    +
    +  // this version is called when decoding the bytes on the receiving end.  The body is handled
    +  // separately.
    +  private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
    +    super(null, false);
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    this.bodyByteCount = bodyByteCount;
    +  }
    +
    +  @Override
    +  public Type type() { return Type.UploadStream; }
    +
    +  @Override
    +  public int encodedLength() {
    +    // the requestId, meta size, meta and bodyByteCount (body is not included)
    +    return 8 + 4 + ((int) meta.size()) + 8;
    +  }
    +
    +  @Override
    +  public void encode(ByteBuf buf) {
    +    buf.writeLong(requestId);
    +    try {
    +      ByteBuffer metaBuf = meta.nioByteBuffer();
    +      buf.writeInt(metaBuf.remaining());
    +      buf.writeBytes(metaBuf);
    +    } catch (IOException io) {
    +      throw new RuntimeException(io);
    +    }
    +    buf.writeLong(bodyByteCount);
    +  }
    +
    +  public static UploadStream decode(ByteBuf buf) {
    +    long requestId = buf.readLong();
    +    int metaSize = buf.readInt();
    +    ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
    +    long bodyByteCount = buf.readLong();
    +    // This is called by the frame decoder, so the data is still null.  We need a StreamInterceptor
    +    // to read the data.
    +    return new UploadStream(requestId, meta, bodyByteCount);
    +  }
    +
    +  @Override
    +  public int hashCode() {
    +    return Objects.hashCode(requestId, body());
    --- End diff --
    
    this is a good point.  Admittedly I just copied this from `StreamResponse` without thinking about it too much -- that class exhibits the same issue.  I'll remove `body` from both.
    
    (In practice, we're not using sticking them in hashmaps now so there wouldn't be any bugs in behavior because of this.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r190999338
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.server;
    +
    +import org.apache.spark.network.client.RpcResponseCallback;
    +import org.apache.spark.network.client.StreamCallback;
    +import org.apache.spark.network.client.StreamInterceptor;
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +/**
    + * A holder for streamed data sent along with an RPC message.
    + */
    +public class StreamData {
    +
    +  private final TransportRequestHandler handler;
    +  private final TransportFrameDecoder frameDecoder;
    +  private final RpcResponseCallback rpcCallback;
    +  private final ByteBuffer meta;
    +  private final long streamByteCount;
    +  private boolean hasCallback = false;
    +
    +  public StreamData(
    +      TransportRequestHandler handler,
    +      TransportFrameDecoder frameDecoder,
    +      RpcResponseCallback rpcCallback,
    +      ByteBuffer meta,
    +      long streamByteCount) {
    +    this.handler = handler;
    +    this.frameDecoder = frameDecoder;
    +    this.rpcCallback = rpcCallback;
    +    this.meta = meta;
    +    this.streamByteCount = streamByteCount;
    +  }
    +
    +  public boolean hasCallback() {
    +    return hasCallback;
    +  }
    +
    +  /**
    +   * Register callback to receive the streaming data.
    +   *
    +   * If an exception is thrown from the callback, it will be propogated back to the sender as an rpc
    +   * failure.
    +   * @param callback
    --- End diff --
    
    either remove or document all parameters (and add an empty line before).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91792 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91792/testReport)** for PR 21346 at commit [`cf991a9`](https://github.com/apache/spark/commit/cf991a9d1923d6639315f5b2a75642a316d394e9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Last failures are known flakies.
    
    A few updates here from my last set of comments.  I've posted an overall design doc, and shared the tests I'm running on a cluster.  I think the tests cover all the cases care about, but would appreciate review on that tests too.  I can change this to use the existing pull approach for large blocks, rather than updating the push one if you want.  If you're OK with this, there will be one more PR on top of this to make use of the new uploadStream functionality.
    
    There will be another PR as well to cover reading large remote blocks in memory for SPARK-24307


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192795662
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    --- End diff --
    
    you bring up a good point here.  I was thinking about how the places we might have an error occur:
    
    1) while reading the stream data (ie. StreamCallback.onData).  In the intended use case, this is basically just opening a file and writing bytes to it.
    
    2) post-processing the complete data (StreamCallback.onComplete).  This is doing the whole BlockManager.put, which can be rather complex.
    
    Failures in (1) are unlikely and are difficult to recover; failures in (2) are more likely, but the channel should be totally fine.  I've updated the code, comments,  and test to make sure things are OK for (2).  https://github.com/apache/spark/pull/21346/commits/6c086c51873c72fa0cf9f373afd069ac63de3b75
    
    though your points are still valid for (1), though I think we can live with it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91791 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91791/testReport)** for PR 21346 at commit [`3d28a1b`](https://github.com/apache/spark/commit/3d28a1be1c045551f11123507883aaa51248d0aa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92101/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4193 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4193/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3561/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191001733
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +183,59 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream: streams) {
    --- End diff --
    
    space before `:`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #92347 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92347/testReport)** for PR 21346 at commit [`58d52b9`](https://github.com/apache/spark/commit/58d52b970b95b1a0bdbb6829371615cbbcf3e936).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91195 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91195/testReport)** for PR 21346 at commit [`7bd1b43`](https://github.com/apache/spark/commit/7bd1b43c81a3cdd7b88cf64994cfe8f2b3c5fdf8).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merging to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195591573
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -207,9 +400,67 @@ private void assertErrorsContain(Set<String> errors, Set<String> contains) {
               break;
             }
           }
    -      assertTrue("Could not find error containing " + contain + "; errors: " + errors, foundMatch);
    +      if (!foundMatch) {
    +        notFound.add(contain);
    +      }
    +    }
    +    return new ImmutablePair<>(remainingErrors, notFound);
    +  }
    +
    +  private static class VerifyingStreamCallback implements StreamCallbackWithID {
    +    final String streamId;
    +    final StreamSuite.TestCallback helper;
    +    final OutputStream out;
    +    final File outFile;
    +    VerifyingStreamCallback(String streamId) throws IOException {
    +      if (streamId.equals("file")) {
    +        outFile = File.createTempFile("data", ".tmp", testData.tempDir);
    +        out = new FileOutputStream(outFile);
    +      } else {
    +        out = new ByteArrayOutputStream();
    +        outFile = null;
    +      }
    +      this.streamId = streamId;
    +      helper = new StreamSuite.TestCallback(out);
    +    }
    +
    +    void waitForCompletionAndVerify(long timeoutMs) throws IOException {
    +      helper.waitForCompletion(timeoutMs);
    +      if (streamId.equals("file")) {
    +        assertTrue("File stream did not match.", Files.equal(testData.testFile, outFile));
    +      } else {
    +        byte[] result = ((ByteArrayOutputStream)out).toByteArray();
    +        ByteBuffer srcBuffer = testData.srcBuffer(streamId);
    +        ByteBuffer base;
    +        synchronized (srcBuffer) {
    +          base = srcBuffer.duplicate();
    +        }
    +        byte[] expected = new byte[base.remaining()];
    +        base.get(expected);
    +        assertEquals(expected.length, result.length);
    +        assertTrue("buffers don't match", Arrays.equals(expected, result));
    +
    --- End diff --
    
    nit: remove


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91122/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195591438
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -207,9 +400,67 @@ private void assertErrorsContain(Set<String> errors, Set<String> contains) {
               break;
             }
           }
    -      assertTrue("Could not find error containing " + contain + "; errors: " + errors, foundMatch);
    +      if (!foundMatch) {
    +        notFound.add(contain);
    +      }
    +    }
    +    return new ImmutablePair<>(remainingErrors, notFound);
    +  }
    +
    +  private static class VerifyingStreamCallback implements StreamCallbackWithID {
    +    final String streamId;
    +    final StreamSuite.TestCallback helper;
    +    final OutputStream out;
    +    final File outFile;
    +    VerifyingStreamCallback(String streamId) throws IOException {
    --- End diff --
    
    nit: add empty line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/489/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195585864
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
           logger.trace("Sending RPC to {}", getRemoteAddress(channel));
         }
     
    -    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    +    long requestId = requestId();
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
     
         return requestId;
       }
     
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    +    if (logger.isTraceEnabled()) {
    +      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    +    }
    +
    +    long requestId = requestId();
    +    handler.addRpcRequest(requestId, callback);
    +
    +    channel.writeAndFlush(new UploadStream(requestId, meta, data))
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
    +
    +    return requestId;
    +  }
    +
    +  private class StdChannelListener
    --- End diff --
    
    I personally try to keep nested classes at the bottom of the enclosing class, but up to you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192565980
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    --- End diff --
    
    I'm trying to think through whether we'll risk introducing any weird new failure modes (or increasing the occurrence of existing-but-improbable failure modes). For example, causing in-flight RPCs to fail could surface latent RPC timeout issues: if we have a timeout which is way too long and we drop in-flight responses on the floor without sending back negative ACKs then we could see (finite but potentially long) hangs.
    
    On the other hand, this pathway is used for executor <-> executor transfers and generally not executor <-> driver transfers, so my understanding is that failures in this channel generally won't impact control RPCs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192823558
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -36,6 +36,9 @@ object MimaExcludes {
     
       // Exclude rules for 2.4.x
       lazy val v24excludes = v23excludes ++ Seq(
    +    // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
    +    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
    --- End diff --
    
    I only see references to them in Scala code... also `private[package]` translates to `public` in Java, so that would at least avoid the mima checks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91450 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91450/testReport)** for PR 21346 at commit [`6c086c5`](https://github.com/apache/spark/commit/6c086c51873c72fa0cf9f373afd069ac63de3b75).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r198233111
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -207,9 +400,67 @@ private void assertErrorsContain(Set<String> errors, Set<String> contains) {
               break;
             }
           }
    -      assertTrue("Could not find error containing " + contain + "; errors: " + errors, foundMatch);
    +      if (!foundMatch) {
    +        notFound.add(contain);
    +      }
    +    }
    +    return new ImmutablePair<>(remainingErrors, notFound);
    +  }
    +
    +  private static class VerifyingStreamCallback implements StreamCallbackWithID {
    +    final String streamId;
    +    final StreamSuite.TestCallback helper;
    +    final OutputStream out;
    +    final File outFile;
    +    VerifyingStreamCallback(String streamId) throws IOException {
    --- End diff --
    
    ping


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91854/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r190998463
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code>(eg. for uploading a large
    --- End diff --
    
    space before `(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91056/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92347/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91138 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91138/testReport)** for PR 21346 at commit [`331124b`](https://github.com/apache/spark/commit/331124b125db6b59009e12249542f667a227226e).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r190997532
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read in a stream.
    --- End diff --
    
    as a stream?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91450 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91450/testReport)** for PR 21346 at commit [`6c086c5`](https://github.com/apache/spark/commit/6c086c51873c72fa0cf9f373afd069ac63de3b75).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4188 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4188/testReport)** for PR 21346 at commit [`3098b9c`](https://github.com/apache/spark/commit/3098b9cd9ffc29517b446bb660fe5be9f0031cc1).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195819061
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream : streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, sem));
    +    }
    +
    +    if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
    +      fail("Timeout getting response from the server");
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    +      try {
    +        streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
    --- End diff --
    
    Isn't the wait part now redundant, after you waited for the semaphore?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4190 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4190/testReport)** for PR 21346 at commit [`7bd1b43`](https://github.com/apache/spark/commit/7bd1b43c81a3cdd7b88cf64994cfe8f2b3c5fdf8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192565530
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    +  /** Used to link an RPC request with its response. */
    +  public final long requestId;
    +  public final ManagedBuffer meta;
    +  public final long bodyByteCount;
    +
    +  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
    +    super(body, false); // body is *not* included in the frame
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    bodyByteCount = body.size();
    +  }
    +
    +  // this version is called when decoding the bytes on the receiving end.  The body is handled
    +  // separately.
    +  private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
    +    super(null, false);
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    this.bodyByteCount = bodyByteCount;
    +  }
    +
    +  @Override
    +  public Type type() { return Type.UploadStream; }
    +
    +  @Override
    +  public int encodedLength() {
    +    // the requestId, meta size, meta and bodyByteCount (body is not included)
    +    return 8 + 4 + ((int) meta.size()) + 8;
    +  }
    +
    +  @Override
    +  public void encode(ByteBuf buf) {
    +    buf.writeLong(requestId);
    +    try {
    +      ByteBuffer metaBuf = meta.nioByteBuffer();
    +      buf.writeInt(metaBuf.remaining());
    +      buf.writeBytes(metaBuf);
    +    } catch (IOException io) {
    +      throw new RuntimeException(io);
    +    }
    +    buf.writeLong(bodyByteCount);
    +  }
    +
    +  public static UploadStream decode(ByteBuf buf) {
    +    long requestId = buf.readLong();
    +    int metaSize = buf.readInt();
    +    ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
    +    long bodyByteCount = buf.readLong();
    +    // This is called by the frame decoder, so the data is still null.  We need a StreamInterceptor
    +    // to read the data.
    +    return new UploadStream(requestId, meta, bodyByteCount);
    +  }
    +
    +  @Override
    +  public int hashCode() {
    +    return Objects.hashCode(requestId, body());
    +  }
    +
    +  @Override
    +  public boolean equals(Object other) {
    +    if (other instanceof UploadStream) {
    +      UploadStream o = (UploadStream) other;
    +      return requestId == o.requestId && super.equals(o);
    +    }
    +    return false;
    +  }
    +
    +  @Override
    +  public String toString() {
    +    return Objects.toStringHelper(this)
    +      .add("requestId", requestId)
    +      .add("body", body())
    --- End diff --
    
    I'm not actually sure. I wonder if this is a latent problem in the old code waiting to happen in case we turn on trace logging. We can probably investigate that separately, but just wanted to note it since it seemed a little dodgy.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195590730
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +200,60 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream : streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, sem));
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    --- End diff --
    
    I'm trying to follow the logic here...
    
    - in L215, client sends a stream to the remote
    - in L82, remote receives the stream and registers the callback
    - here, you wait for the callbacks to finish in the order they were registered.
    
    Isn't there a race between steps 2 and 3, as in you might miss one or more callback registration?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4192 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4192/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4192 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4192/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    > is this effectively dead code at this point?
    
    yes, thats right.  this PR by itself is not useful.  Its a step towards https://github.com/apache/spark/pull/21451
    
    This is a good point to put in the PR summary -- I'll do that, and also your summary notes above, if you don't mind.
    
    > what are the major risks of this change in terms of introducing performance or correctness issues? If we identify risks (e.g. "this is a historically tricky area of code?"), can we mitigate those risks through correctness testing / load testing?
    
    I've made an effort to make minimal modifications to all existing code paths, to minimize the risk of introducing bugs in current functionality.  My intention is to only turn it on by default initially for cases we know would fail with the old code -- when the data is > 2gb ([SPARK-24297](https://issues.apache.org/jira/browse/SPARK-24297)).  I've added unit tests and shared the test I'm doing on a cluster just to find holes in functionality (posted on the parent jira here: https://issues.apache.org/jira/browse/SPARK-6235?focusedCommentId=16484069&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16484069).  I have not done load testing yet but plan to.  Extra testing, of course, would certainly be good.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4090/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192825636
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,28 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  An error while reading data from
    +   * the stream ({@link org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
    +   * will fail the entire channel.  A failure in "post-processing" the stream in
    +   * {@link org.apache.spark.network.client.StreamCallback#onComplete(String)} will result in an
    +   * rpcFailure, but the channel will remain active.
    +   *
    +   * If streamData is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    --- End diff --
    
    I'm wondering if a separate callback for these streams wouldn't be better. It would at the very least avoid having to change all the existing handlers.
    
    But it would also make it clearer what the contract is. For example, the callback could return the stream callback to be registered. 
    
    It also doesn't seem like `StreamData` itself has a lot of useful information other than the registration method, so it could be replaced with parameters in the new callback, avoiding having to expose that type to RPC handlers.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    All good questions and stuff I had wondered about too -- I should actually be sure to comment on these on the jira as well:
    
    > I recall that the problem with large shuffle blocks was that the OneForOneBlockFetcher strategy basically read the entire block as a single chunk, which becomes a problem for large blocks. I understand that we have now removed this limitation for shuffles by using a streaming transfer strategy only for large blocks (above some threshold). Is this patch conceptually doing the same thing for push-based communication where the action is initiated by a sender (e.g. to push a block for replication)? 
    
    yes
    
    > Does it also affect pull-based remote cache block reads or will that be handled separately?
    
    that was already handled by https://issues.apache.org/jira/browse/SPARK-22062 (despite the title saying its something else entirely).  That said, I recently discovered that my tests doing this for large blocks was incorrect, so I need to reconfirm this (I need to rearrange my test a little, and I've got a different aspect of this in flight so will take a couple of days probably).
    
    > Given that we already seem to have pull-based openStream() calls which can be initiated from the receive side, could we simplify things here by pushing a "this value is big, pull it" message and then have the remote end initiate a streaming read, similar to how DirectTaskResult and IndirectTaskResult work?
    
    its certainly possible to do this, and I started taking this approach, but I stopped because [replication is synchronous](https://github.com/apache/spark/blob/bfd75cdfb22a8c2fb005da597621e1ccd3990e82/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1344).  So you'd have to add a callback for when the block is finally fetched, to go back to this initial call -- but also add timeout logic to avoid waiting forever if the destination went away.  It all seemed much more complicated than doing it the way I'm proposing here.
    
    > For remote reads of large cached blocks: is it true that this works today only if the block is on disk but fails if the block is in memory? If certain size limit problems only occur when things are cached in memory, can we simplify anything if we add a requirement that blocks above 2GB can only be cached on disk (regardless of storage level)?
    
    Correct; I'm currently investigating what we can do to address this.  (sorry, again I discovered my test was broken shortly after posting this.)  It would certainly simplify things if we only supported this for disk cached blocks -- what exactly are you proposing?  Just failing when its cached in memory, and telling the user to rerun with disk caching?  Changing the block manager to automatically cache on disk _also_ when the block is > 2gb?  Or when sending the block, just write it to a temp file, and then send from that?
    
    The problem here is on the sending side, not the receiving side; netty uses an [`int` to manage the length of a  `ByteBuf` based msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L178-L180), but it uses a [`long` for a `FileRegion` based msg](https://github.com/netty/netty/blob/netty-4.1.17.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java#L213-L224)  (code is a little different in the latest on branch 4.1, but same problem is still there).  I'm investigating making a "FileRegion" that is actually backed by a `ChunkedByteBuffer`.
    
    But that would go into another jira under SPARK-6235


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91925 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91925/testReport)** for PR 21346 at commit [`ea4a1f5`](https://github.com/apache/spark/commit/ea4a1f5325495a2b611f67e01e8a86953361b1aa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/487/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/146/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    It's been a little while since I've thought about this issue, so I have a few clarifying questions to help me understand the high-level changes:
    
    1. I recall that the problem with large shuffle blocks was that the OneForOneBlockFetcher strategy basically read the entire block as a single chunk, which becomes a problem for large blocks. I understand that we have now removed this limitation for shuffles by using a streaming transfer strategy only for large blocks (above some threshold). Is this patch conceptually doing the same thing for push-based communication where the action is initiated by a sender (e.g. to push a block for replication)? Does it also affect pull-based remote cache block reads or will that be handled separately?
    2. Given that we already seem to have pull-based `openStream()` calls which can be initiated from the receive side, could we simplify things here by pushing a "this value is big, pull it" message and then have the remote end initiate a streaming read, similar to how DirectTaskResult and IndirectTaskResult work?
    3. For remote reads of large cached blocks: is it true that this works today _only if_ the block is on disk but fails if the block is in memory? If certain size limit problems only occur when things are cached in memory, can we simplify anything if we add a requirement that blocks above 2GB can _only_ be cached on disk (regardless of storage level)?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r196558660
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream : streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, sem));
    +    }
    +
    +    if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
    +      fail("Timeout getting response from the server");
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    +      try {
    +        streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
    +      } catch (IOException e) {
    --- End diff --
    
    `forEach` doesn't like the IOException


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4190 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4190/testReport)** for PR 21346 at commit [`7bd1b43`](https://github.com/apache/spark/commit/7bd1b43c81a3cdd7b88cf64994cfe8f2b3c5fdf8).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91450/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21346


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4193 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4193/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3612/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91925 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91925/testReport)** for PR 21346 at commit [`ea4a1f5`](https://github.com/apache/spark/commit/ea4a1f5325495a2b611f67e01e8a86953361b1aa).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r198236456
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -207,9 +400,67 @@ private void assertErrorsContain(Set<String> errors, Set<String> contains) {
               break;
             }
           }
    -      assertTrue("Could not find error containing " + contain + "; errors: " + errors, foundMatch);
    +      if (!foundMatch) {
    +        notFound.add(contain);
    +      }
    +    }
    +    return new ImmutablePair<>(remainingErrors, notFound);
    +  }
    +
    +  private static class VerifyingStreamCallback implements StreamCallbackWithID {
    +    final String streamId;
    +    final StreamSuite.TestCallback helper;
    +    final OutputStream out;
    +    final File outFile;
    +    VerifyingStreamCallback(String streamId) throws IOException {
    --- End diff --
    
    whoops, sorry I missed this one.  fixed now


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195284967
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
    +
    +    return requestId;
    +  }
    +
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    +    if (logger.isTraceEnabled()) {
    +      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    +    }
    +
    +    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    --- End diff --
    
    This  `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated twice. Move it to a separate new method .


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91121/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195584794
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
           logger.trace("Sending RPC to {}", getRemoteAddress(channel));
         }
     
    -    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    +    long requestId = requestId();
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
     
         return requestId;
       }
     
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    --- End diff --
    
    I know you're in the "2 spaces after period camp", but that's 3.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4189 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4189/testReport)** for PR 21346 at commit [`331124b`](https://github.com/apache/spark/commit/331124b125db6b59009e12249542f667a227226e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #90694 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90694/testReport)** for PR 21346 at commit [`fa3ac4e`](https://github.com/apache/spark/commit/fa3ac4e8b0c80964e93e9e777106f97324308293).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    I'm going to be starting a more detailed review pass on this now and will be getting caught back up with the discussion that's happened so far.
    
    One high-level point I'd like to keep in mind: what are the major risks of this change in terms of introducing performance or correctness issues? If we identify risks (e.g. "this is a historically tricky area of code?"), can we mitigate those risks through correctness testing /  load testing?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195584408
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -141,26 +141,14 @@ public void fetchChunk(
         StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
         handler.addFetchRequest(streamChunkId, callback);
     
    -    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
    -      if (future.isSuccess()) {
    -        long timeTaken = System.currentTimeMillis() - startTime;
    -        if (logger.isTraceEnabled()) {
    -          logger.trace("Sending request {} to {} took {} ms", streamChunkId,
    -            getRemoteAddress(channel), timeTaken);
    +    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId))
    +      .addListener( new StdChannelListener(startTime, streamChunkId) {
    --- End diff --
    
    nit: no space after `(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195592355
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network;
    +
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.nio.ByteBuffer;
    +import java.util.Random;
    +
    +import com.google.common.io.Files;
    +
    +import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NioManagedBuffer;
    +import org.apache.spark.network.util.TransportConf;
    +
    +class StreamTestHelper {
    +  static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
    +
    +  final File testFile;
    +  File tempDir;
    +
    +  ByteBuffer emptyBuffer;
    +  ByteBuffer smallBuffer;
    +  ByteBuffer largeBuffer;
    +
    +  private static ByteBuffer createBuffer(int bufSize) {
    +    ByteBuffer buf = ByteBuffer.allocate(bufSize);
    +    for (int i = 0; i < bufSize; i ++) {
    +      buf.put((byte) i);
    +    }
    +    buf.flip();
    +    return buf;
    +  }
    +
    +  StreamTestHelper() throws Exception {
    +    tempDir = Files.createTempDir();
    +    emptyBuffer = createBuffer(0);
    +    smallBuffer = createBuffer(100);
    +    largeBuffer = createBuffer(100000);
    +
    +    testFile = File.createTempFile("stream-test-file", "txt", tempDir);
    +    FileOutputStream fp = new FileOutputStream(testFile);
    +    try {
    +      Random rnd = new Random();
    +      for (int i = 0; i < 512; i++) {
    +        byte[] fileContent = new byte[1024];
    +        rnd.nextBytes(fileContent);
    +        fp.write(fileContent);
    +      }
    +    } finally {
    +      fp.close();
    +    }
    +  }
    +
    +  public ByteBuffer srcBuffer(String name) {
    +    switch (name) {
    +      case "largeBuffer":
    +        return largeBuffer;
    +      case "smallBuffer":
    +        return smallBuffer;
    +      case "emptyBuffer":
    +        return emptyBuffer;
    +      default:
    +        throw new IllegalArgumentException("Invalid stream: " + name);
    +    }
    +  }
    +
    +  public ManagedBuffer openStream(TransportConf conf, String streamId) {
    +    switch (streamId) {
    +      case "file":
    +        return new FileSegmentManagedBuffer(conf, testFile, 0, testFile.length());
    +      default:
    +        return new NioManagedBuffer(srcBuffer(streamId));
    +    }
    +  }
    +
    +
    +  void cleanup() {
    +    if (tempDir != null) {
    +      for (File f : tempDir.listFiles()) {
    --- End diff --
    
    `JavaUtils.deleteRecursively`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3518/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191979425
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    +  /** Used to link an RPC request with its response. */
    +  public final long requestId;
    +  public final ManagedBuffer meta;
    +  public final long bodyByteCount;
    +
    +  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
    +    super(body, false); // body is *not* included in the frame
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    bodyByteCount = body.size();
    +  }
    +
    +  // this version is called when decoding the bytes on the receiving end.  The body is handled
    +  // separately.
    +  private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
    +    super(null, false);
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    this.bodyByteCount = bodyByteCount;
    +  }
    +
    +  @Override
    +  public Type type() { return Type.UploadStream; }
    +
    +  @Override
    +  public int encodedLength() {
    +    // the requestId, meta size, meta and bodyByteCount (body is not included)
    +    return 8 + 4 + ((int) meta.size()) + 8;
    +  }
    +
    +  @Override
    +  public void encode(ByteBuf buf) {
    +    buf.writeLong(requestId);
    +    try {
    +      ByteBuffer metaBuf = meta.nioByteBuffer();
    +      buf.writeInt(metaBuf.remaining());
    +      buf.writeBytes(metaBuf);
    +    } catch (IOException io) {
    +      throw new RuntimeException(io);
    +    }
    +    buf.writeLong(bodyByteCount);
    +  }
    +
    +  public static UploadStream decode(ByteBuf buf) {
    +    long requestId = buf.readLong();
    +    int metaSize = buf.readInt();
    +    ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
    +    long bodyByteCount = buf.readLong();
    +    // This is called by the frame decoder, so the data is still null.  We need a StreamInterceptor
    +    // to read the data.
    +    return new UploadStream(requestId, meta, bodyByteCount);
    +  }
    +
    +  @Override
    +  public int hashCode() {
    +    return Objects.hashCode(requestId, body());
    +  }
    +
    +  @Override
    +  public boolean equals(Object other) {
    +    if (other instanceof UploadStream) {
    +      UploadStream o = (UploadStream) other;
    +      return requestId == o.requestId && super.equals(o);
    +    }
    +    return false;
    +  }
    +
    +  @Override
    +  public String toString() {
    +    return Objects.toStringHelper(this)
    +      .add("requestId", requestId)
    +      .add("body", body())
    --- End diff --
    
    to be honest, this was also just parroted from other classes -- looking now at implementations of ManagedBuffer, if they have a `toString()` it does something reasonable.
    
    Is that actually useful for debugging?  maybe not, don't think I ever actually looked at this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91793 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91793/testReport)** for PR 21346 at commit [`8a18da5`](https://github.com/apache/spark/commit/8a18da511a8053fc4dcf6529f49333d71bd6277d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/107/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Summary of key changes (WIP; notes to self):
    
    
    > Summary of changes:
    > 
    > - Introduce a new `UploadStream` RPC which is sent to push a large payload as a stream (in contrast, the pre-existing `StreamRequest` and `StreamResponse` RPCs are used for pull-based  streaming).
    > - Generalize `RpcHandler.receive()` to support requests which contain streams. 
    > - Generalize `StreamInterceptor` to handle both request and response messages (previously it only handled responses).
    > - Introduce `StdChannelListener` to abstract away common logging logic in `ChannelFuture` listeners.
    
    Question: is this effectively dead code at this point? In other words, this PR just adds the lower-level pieces but there's nothing currently using the new API? So this patch as of now has no behavior change and actual functional changes impacting queries / actual usage will come later when we wire this up to the block replicator?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195287202
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    --- End diff --
    
    Is it possible to merge UploadStream and RpcRequest into a class?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195464525
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
    +
    +    return requestId;
    +  }
    +
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    +    if (logger.isTraceEnabled()) {
    +      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    +    }
    +
    +    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192566116
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -141,26 +141,14 @@ public void fetchChunk(
         StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
         handler.addFetchRequest(streamChunkId, callback);
     
    -    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
    --- End diff --
    
    Thanks for explaining. I guess the re-ordering of `channel.close()` and the `handler` operations is safe because the handler doesn't hold references to the channel / otherwise does not interact with it (and doesn't hold references to objects tied to channel lifecycle (like buffers))?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #90693 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90693/testReport)** for PR 21346 at commit [`49e0a80`](https://github.com/apache/spark/commit/49e0a80f89433368d3a3116eb9fcd7854ceecb62).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `public class StreamInterceptor<T extends Message> implements TransportFrameDecoder.Interceptor `
      * `public final class UploadStream extends AbstractMessage implements RequestMessage `
      * `public class StreamData `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191978545
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -141,26 +141,14 @@ public void fetchChunk(
         StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
         handler.addFetchRequest(streamChunkId, callback);
     
    -    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
    --- End diff --
    
    yes exactly.  Marcelo asked for this refactoring in his review -- there was already a ton of copy-paste, and instead of adding more made sense to refactor.  Shouldn't be any behavior change (there are minor changes that shouldn't matter ...  `channel.close()` happens before the more specific cleanup operations whereas it was in the middle previously, the `try` encompasses a bit more than before.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191002393
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +183,59 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream: streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, sem));
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    +      try {
    +        streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      }
    +    });
    +
    +
    +    if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
    +      fail("Timeout getting response from the server");
    +    }
    +    client.close();
    +    return res;
    +  }
    +
    +  private static class RpcStreamCallback implements RpcResponseCallback {
    +    final String streamId;
    +    final RpcResult res;
    +    final Semaphore sem;
    +    RpcStreamCallback(String streamId, RpcResult res, Semaphore sem) {
    --- End diff --
    
    add empty line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    yeah I see what you're saying about better error handling, but I'd really rather not take that on here.  I think some prior attempts at solving the 2gb limit  have tried to take on too much, and I'd like to keep this is simple as possible, and leave more for future improvements.  I guess it means that when (if) we do make the changes you're proposing, we'd have to go back to changing the network layer again, possibly introducing new message types etc.  But we're not really painting ourselves in a corner at all, we can do that if it becomes necessary.
    
    fwiw, there are other things that are higher on my list to fix when the basic functionality goes in:
    1) when you do a remote read of a cached data, even if you fetch to disk, you memory map the entire file, rather than just using a FileInputStream
    2) if you replicate a disk-cached block, it'll get written to disk to a temp file, then read back from that file into memory, and then written to the new location.
    3) when you a do remote read of cached data, you shouldn't actually have to wait till you fetch all the data, you should just be able to treat it as an inputstream


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3570/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    >  I've posted an overall design doc, and shared the tests I'm running on a cluster.
    
    Where did you post those? Couldn't find them on the bug, nor the bug linked from that bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195795803
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -130,6 +200,60 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream : streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, sem));
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    --- End diff --
    
    oh great point, thanks for catching that.  I will move this after the semaphore check, that will ensure that everything has been added to `streamCallbacks`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195588878
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---
    @@ -203,6 +197,76 @@ public void onFailure(Throwable e) {
         }
       }
     
    +  /**
    +   * Handle a request from the client to upload a stream of data.
    +   */
    +  private void processStreamUpload(final UploadStream req) {
    +    assert (req.body() == null);
    +    try {
    +      RpcResponseCallback callback = new RpcResponseCallback() {
    +        @Override
    +        public void onSuccess(ByteBuffer response) {
    +          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
    +        }
    +
    +        @Override
    +        public void onFailure(Throwable e) {
    +          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    +        }
    +      };
    +      TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
    +          channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
    +      ByteBuffer meta = req.meta.nioByteBuffer();
    +      StreamCallbackWithID streamHandler = rpcHandler.receiveStream(reverseClient, meta, callback);
    +      StreamCallbackWithID wrappedCallback = new StreamCallbackWithID() {
    +        @Override
    +        public void onData(String streamId, ByteBuffer buf) throws IOException {
    +          streamHandler.onData(streamId, buf);
    +        }
    +
    +        @Override
    +        public void onComplete(String streamId) throws IOException {
    +           try {
    +             streamHandler.onComplete(streamId);
    +             callback.onSuccess(ByteBuffer.allocate(0));
    +           } catch (Exception ex) {
    +             IOException ioExc = new IOException("Failure post-processing complete stream;" +
    +               " failing this rpc and leaving channel active");
    +             callback.onFailure(ioExc);
    +             streamHandler.onFailure(streamId, ioExc);
    +           }
    +        }
    +
    +        @Override
    +        public void onFailure(String streamId, Throwable cause) throws IOException {
    +          callback.onFailure(new IOException("Destination failed while reading stream", cause));
    +          streamHandler.onFailure(streamId, cause);
    +        }
    +
    +        @Override
    +        public String getID() {
    +          return streamHandler.getID();
    +        }
    +      };
    +      if (req.bodyByteCount > 0) {
    +        StreamInterceptor interceptor = new StreamInterceptor(this, wrappedCallback.getID(),
    +          req.bodyByteCount, wrappedCallback);
    +        frameDecoder.setInterceptor(interceptor);
    +      } else {
    +        wrappedCallback.onComplete(wrappedCallback.getID());
    +      }
    +    } catch (Exception e) {
    +      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
    +      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    +      // We choose to totally fail the channel, rather than trying to recover as we do in other
    +      // cases.  We don't know how many bytes of the stream the client has already sent for the
    +      // stream, its not worth trying to recover.
    --- End diff --
    
    it's


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191939431
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    +  /** Used to link an RPC request with its response. */
    +  public final long requestId;
    +  public final ManagedBuffer meta;
    +  public final long bodyByteCount;
    +
    +  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
    +    super(body, false); // body is *not* included in the frame
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    bodyByteCount = body.size();
    +  }
    +
    +  // this version is called when decoding the bytes on the receiving end.  The body is handled
    +  // separately.
    +  private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
    +    super(null, false);
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    this.bodyByteCount = bodyByteCount;
    +  }
    +
    +  @Override
    +  public Type type() { return Type.UploadStream; }
    +
    +  @Override
    +  public int encodedLength() {
    +    // the requestId, meta size, meta and bodyByteCount (body is not included)
    +    return 8 + 4 + ((int) meta.size()) + 8;
    +  }
    +
    +  @Override
    +  public void encode(ByteBuf buf) {
    +    buf.writeLong(requestId);
    +    try {
    +      ByteBuffer metaBuf = meta.nioByteBuffer();
    +      buf.writeInt(metaBuf.remaining());
    +      buf.writeBytes(metaBuf);
    +    } catch (IOException io) {
    +      throw new RuntimeException(io);
    +    }
    +    buf.writeLong(bodyByteCount);
    +  }
    +
    +  public static UploadStream decode(ByteBuf buf) {
    +    long requestId = buf.readLong();
    +    int metaSize = buf.readInt();
    +    ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
    +    long bodyByteCount = buf.readLong();
    +    // This is called by the frame decoder, so the data is still null.  We need a StreamInterceptor
    +    // to read the data.
    +    return new UploadStream(requestId, meta, bodyByteCount);
    +  }
    +
    +  @Override
    +  public int hashCode() {
    +    return Objects.hashCode(requestId, body());
    --- End diff --
    
    The `equals()` and `hashCode()` implementations of this `UploadStream` class appear to differ slightly: the `equals()` method only checks equality of the `requestIds`, whereas this hashCode is checking both the `requestId` and the `body()`. I'm not sure what a `ManagedBuffer`'s `hashCode()` is: the `hashCode()` might not depend on the buffer contents, in which case this could lead to false hashCode mismatches for equal requests. Should we use just `requestId` here instead?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91056/testReport)** for PR 21346 at commit [`3098b9c`](https://github.com/apache/spark/commit/3098b9cd9ffc29517b446bb660fe5be9f0031cc1).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3273/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191941503
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,24 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  Any errors while handling the
    +   * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
    --- End diff --
    
    Perhaps naive question: what are the implications of this? Is this referring to a scenario where we've multiplexed multiple asynchronous requests / responses over a single network connection? I think I understand _why_ the failure mode is as stated (we're worried about leaving non-consumed leftover data in the channel) but I just wanted to ask about the implications of failing other in-flight RPCs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91792 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91792/testReport)** for PR 21346 at commit [`cf991a9`](https://github.com/apache/spark/commit/cf991a9d1923d6639315f5b2a75642a316d394e9).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191976952
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.server;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import org.apache.spark.network.client.RpcResponseCallback;
    +import org.apache.spark.network.client.StreamCallback;
    +import org.apache.spark.network.client.StreamInterceptor;
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +/**
    + * A holder for streamed data sent along with an RPC message.
    + */
    +public class StreamData {
    +
    +  private final TransportRequestHandler handler;
    +  private final TransportFrameDecoder frameDecoder;
    +  private final RpcResponseCallback rpcCallback;
    +  private final ByteBuffer meta;
    --- End diff --
    
    whoops, you're right.  I was using this at one point in the follow-on patch, then changed it and didn't fully clean this up.  thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3716/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4189 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4189/testReport)** for PR 21346 at commit [`331124b`](https://github.com/apache/spark/commit/331124b125db6b59009e12249542f667a227226e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    So, one thing that I was thinking about is whether it would be worth it to make error handling a little better here. I think this is no worse than the current status quo, and looking at the related PR I'm not sure how much better this would make things, but...
    
    The current implementation sends a  "header" message + the streamed payload as a single RPC, so there's a single opportunity for the receiver to return an error. That means that if, for example, the receiver does not have enough space to store a block that is being uploaded, it can return an error, but the sender will still try to send all the block data to the receiver (which will just ignore it).
    
    I'm wondering if it would be worth to try to implement this as a couple of "chained RPCs", one that sends the metadata and a second one that streams the data. That way the receiver can error out on the first RPC and the sender can just throw away the second RPC, instead of having to transfer everything.
    
    It might create the "some state needs to be stored somewhere" problem on the receiver side, though. Haven't really thought that far yet.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    btw I may have made the pull-based approach sound more complex than I meant to, I'm happy to take that approach if you think its better.  The fact the replication is synchronous doesn't really matter, I just meant its not a fire-and-forget msg, we have to setup the callbacks to confirm the block has been fetched (or a failure).  It just seemed like extra indirection to me, and I thought it would be better to stay closer to the UploadBlock path.
    
    Are there particular reasons you think that approach would be better?  I guess the receiver can throttle the requests, but on the other hand the task on the sender will block waiting for the replication to finish (whether its success or failure), so we really don't want it to wait too long.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91136/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91192/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/105/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191002520
  
    --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java ---
    @@ -193,10 +299,78 @@ public void sendOneWayMessage() throws Exception {
         }
       }
     
    +  @Test
    +  public void sendRpcWithStreamOneAtATime() throws Exception {
    +    for (String stream: StreamTestHelper.STREAMS) {
    --- End diff --
    
    space before `:`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191941962
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.server;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import org.apache.spark.network.client.RpcResponseCallback;
    +import org.apache.spark.network.client.StreamCallback;
    +import org.apache.spark.network.client.StreamInterceptor;
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +/**
    + * A holder for streamed data sent along with an RPC message.
    + */
    +public class StreamData {
    +
    +  private final TransportRequestHandler handler;
    +  private final TransportFrameDecoder frameDecoder;
    +  private final RpcResponseCallback rpcCallback;
    +  private final ByteBuffer meta;
    --- End diff --
    
    It looks like this field is not actually used in the current implementation. Is that intentional?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3272/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90694/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3560/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #4194 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4194/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195788253
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
           logger.trace("Sending RPC to {}", getRemoteAddress(channel));
         }
     
    -    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    +    long requestId = requestId();
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
     
         return requestId;
       }
     
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    --- End diff --
    
    I didn't do that the originally as I figured you wanted the startTime to be before `writeAndFlush`, but I can work around that too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #90693 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90693/testReport)** for PR 21346 at commit [`49e0a80`](https://github.com/apache/spark/commit/49e0a80f89433368d3a3116eb9fcd7854ceecb62).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r192797087
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java ---
    @@ -38,15 +38,28 @@
        *
        * This method will not be called in parallel for a single TransportClient (i.e., channel).
        *
    +   * The rpc *might* included a data stream in <code>streamData</code> (eg. for uploading a large
    +   * amount of data which should not be buffered in memory here).  An error while reading data from
    +   * the stream ({@link org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
    +   * will fail the entire channel.  A failure in "post-processing" the stream in
    +   * {@link org.apache.spark.network.client.StreamCallback#onComplete(String)} will result in an
    +   * rpcFailure, but the channel will remain active.
    +   *
    +   * If streamData is not null, you *must* call <code>streamData.registerStreamCallback</code>
    +   * before this method returns.
    +   *
        * @param client A channel client which enables the handler to make requests back to the sender
        *               of this RPC. This will always be the exact same object for a particular channel.
        * @param message The serialized bytes of the RPC.
    +   * @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
    +   *                   otherwise it is null.
        * @param callback Callback which should be invoked exactly once upon success or failure of the
        *                 RPC.
        */
       public abstract void receive(
           TransportClient client,
           ByteBuffer message,
    +      StreamData streamData,
    --- End diff --
    
    moving discussion from here: https://github.com/apache/spark/pull/21451#discussion_r191628993
    
    @witgo suggested the `message` could be moved inside `streamData` -- any particular reason to do that?  It would work fine to do it that way as well, though I don't see any advantage.  I guess I'm in favor of keeping it this way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191938203
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---
    @@ -141,26 +141,14 @@ public void fetchChunk(
         StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
         handler.addFetchRequest(streamChunkId, callback);
     
    -    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
    --- End diff --
    
    Are the changes to these `.addListener()` calls primarily cleanup / refactoring? Is the intent to reduce the amount of _new_ duplicate code which would otherwise be added to `uploadStream` in this file?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91854 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91854/testReport)** for PR 21346 at commit [`1a222aa`](https://github.com/apache/spark/commit/1a222aa77d2a31fd3b3ffe21edfc69ab99e80806).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3571/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91324 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91324/testReport)** for PR 21346 at commit [`83c3271`](https://github.com/apache/spark/commit/83c3271d2f45bbef18d865bddbc6807e9fbd2503).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r191940304
  
    --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java ---
    @@ -0,0 +1,107 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.protocol;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +import com.google.common.base.Objects;
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.buffer.ManagedBuffer;
    +import org.apache.spark.network.buffer.NettyManagedBuffer;
    +
    +/**
    + * An RPC with data that is sent outside of the frame, so it can be read as a stream.
    + */
    +public final class UploadStream extends AbstractMessage implements RequestMessage {
    +  /** Used to link an RPC request with its response. */
    +  public final long requestId;
    +  public final ManagedBuffer meta;
    +  public final long bodyByteCount;
    +
    +  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
    +    super(body, false); // body is *not* included in the frame
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    bodyByteCount = body.size();
    +  }
    +
    +  // this version is called when decoding the bytes on the receiving end.  The body is handled
    +  // separately.
    +  private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
    +    super(null, false);
    +    this.requestId = requestId;
    +    this.meta = meta;
    +    this.bodyByteCount = bodyByteCount;
    +  }
    +
    +  @Override
    +  public Type type() { return Type.UploadStream; }
    +
    +  @Override
    +  public int encodedLength() {
    +    // the requestId, meta size, meta and bodyByteCount (body is not included)
    +    return 8 + 4 + ((int) meta.size()) + 8;
    +  }
    +
    +  @Override
    +  public void encode(ByteBuf buf) {
    +    buf.writeLong(requestId);
    +    try {
    +      ByteBuffer metaBuf = meta.nioByteBuffer();
    +      buf.writeInt(metaBuf.remaining());
    +      buf.writeBytes(metaBuf);
    +    } catch (IOException io) {
    +      throw new RuntimeException(io);
    +    }
    +    buf.writeLong(bodyByteCount);
    +  }
    +
    +  public static UploadStream decode(ByteBuf buf) {
    +    long requestId = buf.readLong();
    +    int metaSize = buf.readInt();
    +    ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
    +    long bodyByteCount = buf.readLong();
    +    // This is called by the frame decoder, so the data is still null.  We need a StreamInterceptor
    +    // to read the data.
    +    return new UploadStream(requestId, meta, bodyByteCount);
    +  }
    +
    +  @Override
    +  public int hashCode() {
    +    return Objects.hashCode(requestId, body());
    +  }
    +
    +  @Override
    +  public boolean equals(Object other) {
    +    if (other instanceof UploadStream) {
    +      UploadStream o = (UploadStream) other;
    +      return requestId == o.requestId && super.equals(o);
    +    }
    +    return false;
    +  }
    +
    +  @Override
    +  public String toString() {
    +    return Objects.toStringHelper(this)
    +      .add("requestId", requestId)
    +      .add("body", body())
    --- End diff --
    
    Similar question here about whether `body()` is useful in this context: will this actually end up printing buffer contents, which are potentially huge? Or will it do something reasonable and print only the buffer type?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3615/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90693/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91793/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91792/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #92101 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92101/testReport)** for PR 21346 at commit [`fd62f61`](https://github.com/apache/spark/commit/fd62f615369e287d6deb707d6b0bfa11cfead2fe).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    **[Test build #91121 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91121/testReport)** for PR 21346 at commit [`2fef75f`](https://github.com/apache/spark/commit/2fef75f18a115db542afe96d49b8cbe9ed534d53).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r188801902
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -73,7 +73,10 @@ object MimaExcludes {
         ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.ml.tree.InternalNode"),
         ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.ml.tree.Node"),
         ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.DecisionTreeClassificationModel.this"),
    -    ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this")
    +    ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this"),
    +
    +    // [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
    --- End diff --
    
    I think we started adding these at the top since that is cleaner (doesn't require changing the previous exclusion rule).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3796/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3997/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4036/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/199/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21346
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4223/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org