You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2015/10/22 01:50:00 UTC

[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/9206

    [SPARK-11235] [network] Add ability to stream data using network lib.

    The current interface used to fetch shuffle data is not very efficient for
    large buffers; it requires the receiver to buffer the entirety of the
    contents being downloaded in memory before processing the data.
    
    To use the network library to transfer large files (such as those that
    can be added using SparkContext addJar / addFile), this change adds a
    more efficient way of downloding data, by streaming the data and feeding
    it to a callback as data arrives.
    
    This is achieved by a custom frame decoder that replaces the current netty
    one; this decoder allows entering a mode where framing is skipped and data
    is instead provided directly to a callback. The existing netty classes
    (ByteToMessageDecoder and LengthFieldBasedFrameDecoder) could not be reused
    since their semantics do not allow for the interception approach the new
    decoder uses.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-11235

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9206
    
----
commit 73a0bb8d5fcc321f9a439d6637a1c7735e670ec1
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-10-19T21:41:47Z

    [SPARK-11235] [network] Add ability to stream data using network lib.
    
    The current interface used to fetch shuffle data is not very efficient for
    large buffers; it requires the receiver to buffer the entirety of the
    contents being downloaded in memory before processing the data.
    
    To use the network library to transfer large files (such as those that
    can be added using SparkContext addJar / addFile), this change adds a
    more efficient way of downloding data, by streaming the data and feeding
    it to a callback as data arrives.
    
    This is achieved by a custom frame decoder that replaces the current netty
    one; this decoder allows entering a mode where framing is skipped and data
    is instead provided directly to a callback. The existing netty classes
    (ByteToMessageDecoder and LengthFieldBasedFrameDecoder) could not be reused
    since their semantics do not allow for the interception approach the new
    decoder uses.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152106469
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152108452
  
    **[Test build #44583 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44583/consoleFull)** for PR 9206 at commit [`22b8ee7`](https://github.com/apache/spark/commit/22b8ee72e9b57c216d7e9a0be24f5f0706940615).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152921811
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150056546
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153467662
  
    LGTM. Just a small nit about the missing doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153474396
  
    **[Test build #44951 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44951/consoleFull)** for PR 9206 at commit [`4d0ff67`](https://github.com/apache/spark/commit/4d0ff6768bd9d94e19bd842d13d5dc18b3eae6c6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153472421
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152791785
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44742/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152922663
  
    **[Test build #44795 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44795/consoleFull)** for PR 9206 at commit [`00a78ad`](https://github.com/apache/spark/commit/00a78ad9a9a7c747ed7caf0000e0c64f45add470).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152108537
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152118828
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43710920
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.client;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.ClosedChannelException;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +/**
    + * An interceptor that is registered with the frame decoder to feed stream data to a
    + * callback.
    + */
    +class StreamInterceptor implements TransportFrameDecoder.Interceptor {
    +
    +  private final String streamId;
    +  private final long byteCount;
    +  private final StreamCallback callback;
    +
    +  private volatile long bytesRead;
    +
    +  StreamInterceptor(String streamId, long byteCount, StreamCallback callback) {
    +    this.streamId = streamId;
    +    this.byteCount = byteCount;
    +    this.callback = callback;
    +    this.bytesRead = 0;
    +  }
    +
    +  @Override
    +  public void exceptionCaught(Throwable cause) throws Exception {
    +    callback.onFailure(streamId, cause);
    +  }
    +
    +  @Override
    +  public void channelInactive() throws Exception {
    +    callback.onFailure(streamId, new ClosedChannelException());
    +  }
    +
    +  @Override
    +  public boolean handle(ByteBuf buf) throws Exception {
    +    int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
    +    ByteBuffer nioBuffer = buf.readSlice(toRead).nioBuffer();
    +
    +    int available = nioBuffer.remaining();
    +    callback.onData(streamId, nioBuffer);
    +    bytesRead += available;
    +    if (bytesRead > byteCount) {
    +      RuntimeException re = new IllegalStateException(String.format(
    --- End diff --
    
    More of a sanity check, just in case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152690740
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44724/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152107187
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152784108
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43279805
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.client;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +/**
    + * Callback for streaming data. Stream data will be offered to the {@link onData(ByteBuffer)}
    + * method as it arrives. Once all the stream data is received, {@link onComplete()} will be
    + * called.
    + * <p/>
    + * The network library guarantees that a single thread will call these methods at a time, but
    + * different call may be made by different threads.
    --- End diff --
    
    I'm not sure I understand what this means -- are you saying that eg. only one thread will be inside `onData` at a time, but another thread might be in `onComplete`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152134493
  
    **[Test build #44583 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44583/consoleFull)** for PR 9206 at commit [`22b8ee7`](https://github.com/apache/spark/commit/22b8ee72e9b57c216d7e9a0be24f5f0706940615).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152123057
  
    **[Test build #44584 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44584/consoleFull)** for PR 9206 at commit [`22b8ee7`](https://github.com/apache/spark/commit/22b8ee72e9b57c216d7e9a0be24f5f0706940615).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43793226
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    --- End diff --
    
    It seems weird that using 8 bytes for an `int` length.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153795097
  
    Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152791784
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152687246
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152948180
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152107173
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152461553
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150057544
  
    **[Test build #44102 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44102/consoleFull)** for PR 9206 at commit [`73a0bb8`](https://github.com/apache/spark/commit/73a0bb8d5fcc321f9a439d6637a1c7735e670ec1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150077052
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44102/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43335096
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/server/StreamManager.java ---
    @@ -47,6 +47,14 @@
       public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
     
       /**
    +   * Called in response to a stream() request. The returned data is streamed to the client
    +   * through a single TCP connection.
    +   */
    +  public ManagedBuffer openStream(String streamId) {
    --- End diff --
    
    I actually don't like how the other method (`getChunk`) uses "stream" when it doesn't really stream anything... I dislike "unframedStream" because "unframed" is irrelevant to the user, it's just being there as a contrived way to differentiate the two APIs.
    
    I can make the javadoc more explicit about the lack of relationship with the other method. Maybe `getStreamBuffer`, although that has the same possibility of confusion with the other method. I do like `openStream` better than any other variant, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152148360
  
    **[Test build #44584 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44584/consoleFull)** for PR 9206 at commit [`22b8ee7`](https://github.com/apache/spark/commit/22b8ee72e9b57c216d7e9a0be24f5f0706940615).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150077050
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43285564
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p/>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p/>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p/>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    --- End diff --
    
    Just an idea, dunno if this is possible or not -- instead of doing the frame decoding itself, could this delegate to a `LengthFieldBasedFrameDecoder` when there is no interceptor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152462600
  
    **[Test build #44672 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44672/consoleFull)** for PR 9206 at commit [`a433f30`](https://github.com/apache/spark/commit/a433f30f5d4775eba916b48ea7d79fedff07160c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43335302
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java ---
    @@ -50,6 +55,8 @@
     
       private final Map<Long, RpcResponseCallback> outstandingRpcs;
     
    +  private final Queue<StreamCallback> streamCallbacks;
    --- End diff --
    
    Yes, it can be a queue. The `StreamRequest` RPC being added is responded in the order they are sent (the netty read path is single threaded, and the handler for this message responds to it immediately, instead of dispatching to some thread pool).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152108538
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44582/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152921824
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153472447
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153506214
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44951/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152948027
  
    **[Test build #44795 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44795/consoleFull)** for PR 9206 at commit [`00a78ad`](https://github.com/apache/spark/commit/00a78ad9a9a7c747ed7caf0000e0c64f45add470).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152106496
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152687416
  
    **[Test build #44724 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44724/consoleFull)** for PR 9206 at commit [`a1617a7`](https://github.com/apache/spark/commit/a1617a7b58828d2afb4088fa6d3b9064e636e884).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153193676
  
    ping @rxin @zsxwing 
    
    This is blocking other stuff I'm working on so I'd really like to push this soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152119345
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43357714
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p/>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p/>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p/>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    +
    +  private CompositeByteBuf buffer;
    +  private volatile Interceptor interceptor;
    +
    +  @Override
    +  public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
    +    ByteBuf in = (ByteBuf) data;
    +
    +    if (buffer == null) {
    +      buffer = in.alloc().compositeBuffer();
    +    }
    +
    +    buffer.writeBytes(in);
    +
    +    while (buffer.isReadable()) {
    +      feedInterceptor();
    +      if (interceptor != null) {
    +        continue;
    +      }
    +
    +      ByteBuf frame = decodeNext();
    +      if (frame != null) {
    +        ctx.fireChannelRead(frame);
    +      } else {
    +        break;
    +      }
    +    }
    +
    +    // We can't discard read sub-buffers if there are other references to the buffer (e.g.
    +    // through slices used for framing). This assumes that code that retains references
    +    // will call retain() from the thread that called "fireChannelRead()" above, otherwise
    +    // ref counting will go awry.
    +    if (buffer != null && buffer.refCnt() == 1) {
    +      buffer.discardReadComponents();
    +    }
    +  }
    +
    +  protected ByteBuf decodeNext() throws Exception {
    +    if (buffer.readableBytes() < LENGTH_SIZE) {
    +      return null;
    +    }
    +
    +    int frameLen = (int) buffer.readLong() - LENGTH_SIZE;
    --- End diff --
    
    Actually I'll keep my approach because then I don't need to move the index forward in case there's enough data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43335526
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p/>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p/>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p/>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    +
    +  private CompositeByteBuf buffer;
    +  private volatile Interceptor interceptor;
    +
    +  @Override
    +  public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
    +    ByteBuf in = (ByteBuf) data;
    +
    +    if (buffer == null) {
    +      buffer = in.alloc().compositeBuffer();
    +    }
    +
    +    buffer.writeBytes(in);
    +
    +    while (buffer.isReadable()) {
    +      feedInterceptor();
    +      if (interceptor != null) {
    +        continue;
    +      }
    +
    +      ByteBuf frame = decodeNext();
    +      if (frame != null) {
    +        ctx.fireChannelRead(frame);
    +      } else {
    +        break;
    +      }
    +    }
    +
    +    // We can't discard read sub-buffers if there are other references to the buffer (e.g.
    +    // through slices used for framing). This assumes that code that retains references
    +    // will call retain() from the thread that called "fireChannelRead()" above, otherwise
    +    // ref counting will go awry.
    +    if (buffer != null && buffer.refCnt() == 1) {
    +      buffer.discardReadComponents();
    +    }
    +  }
    +
    +  protected ByteBuf decodeNext() throws Exception {
    +    if (buffer.readableBytes() < LENGTH_SIZE) {
    +      return null;
    +    }
    +
    +    int frameLen = (int) buffer.readLong() - LENGTH_SIZE;
    --- End diff --
    
    I fix the reader index two lines down in that case, but I can use your approach too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43709190
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.client;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.ClosedChannelException;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +/**
    + * An interceptor that is registered with the frame decoder to feed stream data to a
    + * callback.
    + */
    +class StreamInterceptor implements TransportFrameDecoder.Interceptor {
    +
    +  private final String streamId;
    +  private final long byteCount;
    +  private final StreamCallback callback;
    +
    +  private volatile long bytesRead;
    +
    +  StreamInterceptor(String streamId, long byteCount, StreamCallback callback) {
    +    this.streamId = streamId;
    +    this.byteCount = byteCount;
    +    this.callback = callback;
    +    this.bytesRead = 0;
    +  }
    +
    +  @Override
    +  public void exceptionCaught(Throwable cause) throws Exception {
    +    callback.onFailure(streamId, cause);
    +  }
    +
    +  @Override
    +  public void channelInactive() throws Exception {
    +    callback.onFailure(streamId, new ClosedChannelException());
    +  }
    +
    +  @Override
    +  public boolean handle(ByteBuf buf) throws Exception {
    +    int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
    +    ByteBuffer nioBuffer = buf.readSlice(toRead).nioBuffer();
    +
    +    int available = nioBuffer.remaining();
    +    callback.onData(streamId, nioBuffer);
    +    bytesRead += available;
    +    if (bytesRead > byteCount) {
    +      RuntimeException re = new IllegalStateException(String.format(
    --- End diff --
    
    When this could happen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152784092
  
    Unrelated failure. Retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43335377
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.client;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +
    +/**
    + * Callback for streaming data. Stream data will be offered to the {@link onData(ByteBuffer)}
    + * method as it arrives. Once all the stream data is received, {@link onComplete()} will be
    + * called.
    + * <p/>
    + * The network library guarantees that a single thread will call these methods at a time, but
    + * different call may be made by different threads.
    --- End diff --
    
    I'm saying that only one thread will be calling this code at a time, so it doesn't need to be thread-safe, but different calls can be made from different threads, so don't do crazy things like store state in ThreadLocals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152119412
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153506212
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150056518
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152784110
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43389586
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p/>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p/>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p/>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    +
    +  private CompositeByteBuf buffer;
    +  private volatile Interceptor interceptor;
    +
    +  @Override
    +  public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
    +    ByteBuf in = (ByteBuf) data;
    +
    +    if (buffer == null) {
    +      buffer = in.alloc().compositeBuffer();
    +    }
    +
    +    buffer.writeBytes(in);
    +
    +    while (buffer.isReadable()) {
    +      feedInterceptor();
    +      if (interceptor != null) {
    +        continue;
    +      }
    +
    +      ByteBuf frame = decodeNext();
    +      if (frame != null) {
    +        ctx.fireChannelRead(frame);
    +      } else {
    +        break;
    +      }
    +    }
    +
    +    // We can't discard read sub-buffers if there are other references to the buffer (e.g.
    +    // through slices used for framing). This assumes that code that retains references
    +    // will call retain() from the thread that called "fireChannelRead()" above, otherwise
    +    // ref counting will go awry.
    +    if (buffer != null && buffer.refCnt() == 1) {
    +      buffer.discardReadComponents();
    +    }
    +  }
    +
    +  protected ByteBuf decodeNext() throws Exception {
    +    if (buffer.readableBytes() < LENGTH_SIZE) {
    +      return null;
    +    }
    +
    +    int frameLen = (int) buffer.readLong() - LENGTH_SIZE;
    --- End diff --
    
    doh, sorry I totally missed that, this is fine.  I guess I have just seen it the other way in some examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150076974
  
    **[Test build #44102 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44102/consoleFull)** for PR 9206 at commit [`73a0bb8`](https://github.com/apache/spark/commit/73a0bb8d5fcc321f9a439d6637a1c7735e670ec1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n  * `case class First(child: Expression, ignoreNullsExpr: Expression) extends DeclarativeAggregate `\n  * `case class Last(child: Expression, ignoreNullsExpr: Expression) extends DeclarativeAggregate `\n  * `case class First(`\n  * `case class FirstFunction(`\n  * `case class Last(`\n  * `case class LastFunction(`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152134636
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152148481
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152791764
  
    **[Test build #44742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44742/consoleFull)** for PR 9206 at commit [`a1617a7`](https://github.com/apache/spark/commit/a1617a7b58828d2afb4088fa6d3b9064e636e884).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152148484
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44584/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43710902
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.client;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.ClosedChannelException;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +/**
    + * An interceptor that is registered with the frame decoder to feed stream data to a
    + * callback.
    + */
    +class StreamInterceptor implements TransportFrameDecoder.Interceptor {
    +
    +  private final String streamId;
    +  private final long byteCount;
    +  private final StreamCallback callback;
    +
    +  private volatile long bytesRead;
    --- End diff --
    
    It's extremely unlikely that it would cause a problem without volatile. But it's used in `handle`, which technically can be called from different threads (just not at the same time), so it doesn't hurt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-151760553
  
    let's try more people, /cc @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43398814
  
    --- Diff: network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelHandlerContext;
    +import org.junit.Test;
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +public class TransportFrameDecoderSuite {
    +
    +  @Test
    +  public void testFrameDecoding() throws Exception {
    +    Random rnd = new Random();
    +    TransportFrameDecoder decoder = new TransportFrameDecoder();
    +    ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
    +
    +    List<ByteBuf> buffers = new ArrayList<>();
    +    for (int i = 0; i < 100; i++) {
    +      // Create two buffers; the first one will be large enough to contain the first full
    +      // frame and maybe a few bytes of the second frame.
    +      byte[] data1 = new byte[1024 * (rnd.nextInt(31) + 1)];
    +      byte[] data2 = new byte[1024 * (rnd.nextInt(31) + 1)];
    +      int totalSize = 2 * 8 + data1.length + data2.length;
    +      int size1 = 8 + data1.length + 8 * (rnd.nextInt(data2.length / 2) / 8);
    +      int size2 = totalSize - size1;
    +
    +      assertTrue(size1 >= data1.length + 8);
    +      assertTrue(size2 > 8);
    +
    +      ByteBuf buf1 = Unpooled.buffer(size1);
    +      ByteBuf buf2 = Unpooled.buffer(size2);
    +
    +      buf1.writeLong(data1.length + 8);
    +      buf1.writeBytes(data1);
    +
    +      int remaining = size1 - data1.length - 8;
    +      assertTrue(remaining % 8 == 0);
    +      if (remaining >= 8) {
    +        buf1.writeLong(data2.length + 8);
    +        remaining -= 8;
    +      } else {
    +        buf2.writeLong(data2.length + 8);
    +      }
    +
    +      if (remaining > 0) {
    +        buf1.writeBytes(data2, 0, remaining);
    +      }
    +      buf2.writeBytes(data2, remaining, data2.length - remaining);
    +      buffers.add(buf1);
    +      buffers.add(buf2);
    +    }
    +
    +    try {
    +      for (ByteBuf buf : buffers) {
    +        decoder.channelRead(ctx, buf);
    +      }
    +
    +      verify(ctx, times(buffers.size())).fireChannelRead(any(ByteBuf.class));
    --- End diff --
    
    check that the buffers passed to `ctx.fireChannelRead` are actually the right frames, ie. `data1` and `data2`, not `buf1` and `buf2`, since the number of calls would be the same either way.  In fact, the expectation really shouldn't be `times(buffers.size())`, b/c even if you split the data into 3 channelRead calls, you'd still expect `ctx.fireChannelRead` to get called twice, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152690734
  
    **[Test build #44724 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44724/consoleFull)** for PR 9206 at commit [`a1617a7`](https://github.com/apache/spark/commit/a1617a7b58828d2afb4088fa6d3b9064e636e884).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152489343
  
    **[Test build #44672 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44672/consoleFull)** for PR 9206 at commit [`a433f30`](https://github.com/apache/spark/commit/a433f30f5d4775eba916b48ea7d79fedff07160c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152489746
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43282659
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/server/StreamManager.java ---
    @@ -47,6 +47,14 @@
       public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
     
       /**
    +   * Called in response to a stream() request. The returned data is streamed to the client
    +   * through a single TCP connection.
    +   */
    +  public ManagedBuffer openStream(String streamId) {
    --- End diff --
    
    I wonder if we should use another name for `streamId`, to distinguish it from the `long streamId` in the other methods here, to avoid confusion.  Even `openStream` could be confused since the existing code also talks about streams a lot.  "unframedStream" is the best I can think of.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43395471
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java ---
    @@ -50,6 +55,8 @@
     
       private final Map<Long, RpcResponseCallback> outstandingRpcs;
     
    +  private final Queue<StreamCallback> streamCallbacks;
    --- End diff --
    
    I see.  this is why the `synchronized` block is in `TransportClient#stream`?  would be good to add a comment explaining that (maybe even something brief in both places).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43334824
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p/>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p/>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p/>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    --- End diff --
    
    I tried really hard to do that way first. But you have no control over how data arrives; the buffer may contain data from multiple frames, and as soon as the frame decoder receives that buffer, it will try to decode multiple frames, and throw exceptions if the "new frame" (which contains unframed data) doesn't look like a valid frame.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43478074
  
    --- Diff: network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelHandlerContext;
    +import org.junit.Test;
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +public class TransportFrameDecoderSuite {
    +
    +  @Test
    +  public void testFrameDecoding() throws Exception {
    +    Random rnd = new Random();
    +    TransportFrameDecoder decoder = new TransportFrameDecoder();
    +    ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
    +
    +    List<ByteBuf> buffers = new ArrayList<>();
    +    for (int i = 0; i < 100; i++) {
    +      // Create two buffers; the first one will be large enough to contain the first full
    +      // frame and maybe a few bytes of the second frame.
    +      byte[] data1 = new byte[1024 * (rnd.nextInt(31) + 1)];
    +      byte[] data2 = new byte[1024 * (rnd.nextInt(31) + 1)];
    +      int totalSize = 2 * 8 + data1.length + data2.length;
    +      int size1 = 8 + data1.length + 8 * (rnd.nextInt(data2.length / 2) / 8);
    +      int size2 = totalSize - size1;
    +
    +      assertTrue(size1 >= data1.length + 8);
    +      assertTrue(size2 > 8);
    +
    +      ByteBuf buf1 = Unpooled.buffer(size1);
    +      ByteBuf buf2 = Unpooled.buffer(size2);
    +
    +      buf1.writeLong(data1.length + 8);
    +      buf1.writeBytes(data1);
    +
    +      int remaining = size1 - data1.length - 8;
    +      assertTrue(remaining % 8 == 0);
    +      if (remaining >= 8) {
    +        buf1.writeLong(data2.length + 8);
    +        remaining -= 8;
    +      } else {
    +        buf2.writeLong(data2.length + 8);
    +      }
    +
    +      if (remaining > 0) {
    +        buf1.writeBytes(data2, 0, remaining);
    +      }
    +      buf2.writeBytes(data2, remaining, data2.length - remaining);
    +      buffers.add(buf1);
    +      buffers.add(buf2);
    +    }
    +
    +    try {
    +      for (ByteBuf buf : buffers) {
    +        decoder.channelRead(ctx, buf);
    +      }
    +
    +      verify(ctx, times(buffers.size())).fireChannelRead(any(ByteBuf.class));
    --- End diff --
    
    yeah, I'll change it. it just happens that the number of buffers and number of frames matches, and it would be better if they didn't.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-150058459
  
    /cc @rxin @aarondav 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152948182
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44795/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152690739
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152134637
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44583/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152461532
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43277641
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p/>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p/>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p/>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    +
    +  private CompositeByteBuf buffer;
    +  private volatile Interceptor interceptor;
    +
    +  @Override
    +  public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
    +    ByteBuf in = (ByteBuf) data;
    +
    +    if (buffer == null) {
    +      buffer = in.alloc().compositeBuffer();
    +    }
    +
    +    buffer.writeBytes(in);
    +
    +    while (buffer.isReadable()) {
    +      feedInterceptor();
    +      if (interceptor != null) {
    +        continue;
    +      }
    +
    +      ByteBuf frame = decodeNext();
    +      if (frame != null) {
    +        ctx.fireChannelRead(frame);
    +      } else {
    +        break;
    +      }
    +    }
    +
    +    // We can't discard read sub-buffers if there are other references to the buffer (e.g.
    +    // through slices used for framing). This assumes that code that retains references
    +    // will call retain() from the thread that called "fireChannelRead()" above, otherwise
    +    // ref counting will go awry.
    +    if (buffer != null && buffer.refCnt() == 1) {
    +      buffer.discardReadComponents();
    +    }
    +  }
    +
    +  protected ByteBuf decodeNext() throws Exception {
    +    if (buffer.readableBytes() < LENGTH_SIZE) {
    +      return null;
    +    }
    +
    +    int frameLen = (int) buffer.readLong() - LENGTH_SIZE;
    --- End diff --
    
    should this be `buffer.readLong(buffer.readerIndex())` so that you don't advance the buffer when you don't have enough data yet?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9206


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43793910
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    --- End diff --
    
    The network library uses a long for sizes, even though the max frame size fits in an int. Don't ask me, that's how it was before and can't be changed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152784398
  
    **[Test build #44742 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44742/consoleFull)** for PR 9206 at commit [`a1617a7`](https://github.com/apache/spark/commit/a1617a7b58828d2afb4088fa6d3b9064e636e884).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43281238
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java ---
    @@ -50,6 +55,8 @@
     
       private final Map<Long, RpcResponseCallback> outstandingRpcs;
     
    +  private final Queue<StreamCallback> streamCallbacks;
    --- End diff --
    
    are you sure this can just be a queue?  I would have expected a `Map<StreamId, StreamCallback>`.  You don't know the order you'll get the stream responses back, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43709051
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.client;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.ClosedChannelException;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.spark.network.util.TransportFrameDecoder;
    +
    +/**
    + * An interceptor that is registered with the frame decoder to feed stream data to a
    + * callback.
    + */
    +class StreamInterceptor implements TransportFrameDecoder.Interceptor {
    +
    +  private final String streamId;
    +  private final long byteCount;
    +  private final StreamCallback callback;
    +
    +  private volatile long bytesRead;
    --- End diff --
    
    Does `bytesRead` have to be `volatile`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152489749
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44672/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-152687242
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43278694
  
    --- Diff: network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Random;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import io.netty.channel.ChannelHandlerContext;
    +import org.junit.Test;
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +public class TransportFrameDecoderSuite {
    +
    +  @Test
    +  public void testFrameDecoding() throws Exception {
    +    Random rnd = new Random();
    +    TransportFrameDecoder decoder = new TransportFrameDecoder();
    +    ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
    +
    +    List<ByteBuf> buffers = new ArrayList<>();
    +    for (int i = 0; i < 100; i++) {
    +      byte[] data = new byte[rnd.nextInt(32 * 1024)];
    +      buffers.add(Unpooled.copyLong(data.length + 8));
    +      buffers.add(Unpooled.wrappedBuffer(data));
    +    }
    +
    +    try {
    +      for (ByteBuf buf : buffers) {
    +        decoder.channelRead(ctx, buf);
    +      }
    +
    +      verify(ctx, times(buffers.size() / 2)).fireChannelRead(any(ByteBuf.class));
    +    } finally {
    +      for (ByteBuf buf : buffers) {
    +        buf.release();
    +      }
    +    }
    +  }
    --- End diff --
    
    can you add a test for correctly handling incomplete frames?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9206#issuecomment-153506072
  
    **[Test build #44951 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44951/consoleFull)** for PR 9206 at commit [`4d0ff67`](https://github.com/apache/spark/commit/4d0ff6768bd9d94e19bd842d13d5dc18b3eae6c6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamInterceptor implements TransportFrameDecoder.Interceptor `\n  * `public final class ChunkFetchSuccess extends ResponseWithBody `\n  * `public abstract class ResponseWithBody implements ResponseMessage `\n  * `public final class StreamFailure implements ResponseMessage `\n  * `public final class StreamRequest implements RequestMessage `\n  * `public final class StreamResponse extends ResponseWithBody `\n  * `public class TransportFrameDecoder extends ChannelInboundHandlerAdapter `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11235] [network] Add ability to stream ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9206#discussion_r43797572
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java ---
    @@ -0,0 +1,146 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.util;
    +
    +import com.google.common.base.Preconditions;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.CompositeByteBuf;
    +import io.netty.channel.ChannelHandlerContext;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +
    +/**
    + * A customized frame decoder that allows intercepting raw data.
    + * <p>
    + * This behaves like Netty's frame decoder (with harcoded parameters that match this library's
    + * needs), except it allows an interceptor to be installed to read data directly before it's
    + * framed.
    + * <p>
    + * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    + * decoded, instead of building as many frames as the current buffer allows and dispatching
    + * all of them. This allows a child handler to install an interceptor if needed.
    + * <p>
    + * If an interceptor is installed, framing stops, and data is instead fed directly to the
    + * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    + * framing resumes. Interceptors should not hold references to the data buffers provided
    + * to their handle() method.
    + */
    +public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    +
    +  public static final String HANDLER_NAME = "frameDecoder";
    +  private static final int LENGTH_SIZE = 8;
    +  private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
    +
    +  private CompositeByteBuf buffer;
    +  private volatile Interceptor interceptor;
    +
    +  @Override
    +  public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
    +    ByteBuf in = (ByteBuf) data;
    +
    +    if (buffer == null) {
    +      buffer = in.alloc().compositeBuffer();
    +    }
    +
    +    buffer.writeBytes(in);
    +
    +    while (buffer.isReadable()) {
    +      feedInterceptor();
    +      if (interceptor != null) {
    +        continue;
    +      }
    +
    +      ByteBuf frame = decodeNext();
    +      if (frame != null) {
    +        ctx.fireChannelRead(frame);
    +      } else {
    +        break;
    +      }
    +    }
    +
    +    // We can't discard read sub-buffers if there are other references to the buffer (e.g.
    +    // through slices used for framing). This assumes that code that retains references
    +    // will call retain() from the thread that called "fireChannelRead()" above, otherwise
    +    // ref counting will go awry.
    +    if (buffer != null && buffer.refCnt() == 1) {
    +      buffer.discardReadComponents();
    +    }
    +  }
    +
    +  protected ByteBuf decodeNext() throws Exception {
    +    if (buffer.readableBytes() < LENGTH_SIZE) {
    +      return null;
    +    }
    +
    +    int frameLen = (int) buffer.readLong() - LENGTH_SIZE;
    +    if (buffer.readableBytes() < frameLen) {
    +      buffer.readerIndex(buffer.readerIndex() - LENGTH_SIZE);
    +      return null;
    +    }
    +
    +    Preconditions.checkArgument(frameLen < MAX_FRAME_SIZE, "Too large frame: %s", frameLen);
    +    Preconditions.checkArgument(frameLen > 0, "Frame length should be positive: %s", frameLen);
    +
    +    ByteBuf frame = buffer.readSlice(frameLen);
    +    frame.retain();
    +    return frame;
    +  }
    +
    +  @Override
    +  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    +    if (buffer != null) {
    +      if (buffer.isReadable()) {
    +        feedInterceptor();
    +      }
    +      buffer.release();
    +    }
    +    if (interceptor != null) {
    +      interceptor.channelInactive();
    +    }
    +    super.channelInactive(ctx);
    +  }
    +
    +  @Override
    +  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +    if (interceptor != null) {
    +      interceptor.exceptionCaught(cause);
    +    }
    +    super.exceptionCaught(ctx, cause);
    +  }
    +
    +  public void setInterceptor(Interceptor interceptor) {
    +    Preconditions.checkState(this.interceptor == null, "Already have an interceptor.");
    +    this.interceptor = interceptor;
    +  }
    +
    +  private void feedInterceptor() throws Exception {
    +    if (interceptor != null && !interceptor.handle(buffer)) {
    +      interceptor = null;
    +    }
    +  }
    +
    +  public static interface Interceptor {
    +
    +    boolean handle(ByteBuf data) throws Exception;
    --- End diff --
    
    nit: could you add doc for the meaning of the return value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org