You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rxin <gi...@git.apache.org> on 2014/09/09 09:32:19 UTC

[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

GitHub user rxin opened a pull request:

    https://github.com/apache/spark/pull/2330

    [SPARK-3453] [WIP] Refactor Netty module to use BlockTransferService.

    Also includes some partial support for uploading blocks.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rxin/spark netty-blockTransferService

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2330
    
----
commit 5bb88f5fb7b02557d2c5438275b49993d0956e80
Author: Reynold Xin <rx...@apache.org>
Date:   2014-09-09T07:29:33Z

    [SPARK-3453] Refactor Netty module to use BlockTransferService.
    
    Also includes some partial support for uploading blocks.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17463574
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/protocol.scala ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.{List => JList}
    +
    +import io.netty.buffer.ByteBuf
    +import io.netty.channel.ChannelHandlerContext
    +import io.netty.channel.ChannelHandler.Sharable
    +import io.netty.handler.codec._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
    +
    +
    +/** Messages from the client to the server. */
    +sealed trait ClientRequest {
    +  def id: Byte
    +}
    +
    +/**
    + * Request to fetch a sequence of blocks from the server. A single [[BlockFetchRequest]] can
    + * correspond to multiple [[ServerResponse]]s.
    + */
    +final case class BlockFetchRequest(blocks: Seq[String]) extends ClientRequest {
    +  override def id = 0
    +}
    +
    +/**
    + * Request to upload a block to the server. Currently the server does not ack the upload request.
    + */
    +final case class BlockUploadRequest(blockId: String, data: ManagedBuffer) extends ClientRequest {
    --- End diff --
    
    weird name, is this blockId like a requestType sort of thing? It appears to only either be 0 or 1 right now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55482859
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20256/consoleFull) for   PR 2330 at commit [`29fe0cc`](https://github.com/apache/spark/commit/29fe0cc9ea6ca1285d054b335d989d1131aa4b69).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57220750
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20988/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57222699
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20990/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57220163
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20986/consoleFull) for   PR 2330 at commit [`f23e682`](https://github.com/apache/spark/commit/f23e6821b250157adb0f2a44762972efacf78ef9).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18263553
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockDataManager.scala ---
    @@ -20,14 +20,14 @@ package org.apache.spark.network
     import org.apache.spark.storage.StorageLevel
     
     
    +private[spark]
     trait BlockDataManager {
     
       /**
    -   * Interface to get local block data.
    -   *
    -   * @return Some(buffer) if the block exists locally, and None if it doesn't.
    +   * Interface to get local block data. Throws an exception if the block cannot be found or
    +   * cannot be read successfully.
        */
    -  def getBlockData(blockId: String): Option[ManagedBuffer]
    +  def getBlockData(blockId: String): ManagedBuffer
    --- End diff --
    
    Not sure how I feel about this taking a blockId as a string, especially if it's implemented by the BlockManager itself. What's the reasoning behind not taking a BlockId? Just too many users of the API that only have a String?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55213274
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20131/consoleFull) for   PR 2330 at commit [`b32c3fe`](https://github.com/apache/spark/commit/b32c3fe568163614a6eb424e523f7dd545d8ce9e).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57260153
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21012/consoleFull) for   PR 2330 at commit [`a3a09f6`](https://github.com/apache/spark/commit/a3a09f6485950bc859b3724a20cea39fbee0be2b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17463515
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
    +
    +import io.netty.bootstrap.Bootstrap
    +import io.netty.buffer.PooledByteBufAllocator
    +import io.netty.channel._
    +import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
    +import io.netty.channel.nio.NioEventLoopGroup
    +import io.netty.channel.oio.OioEventLoopGroup
    +import io.netty.channel.socket.SocketChannel
    +import io.netty.channel.socket.nio.NioSocketChannel
    +import io.netty.channel.socket.oio.OioSocketChannel
    +import io.netty.util.internal.PlatformDependent
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * Factory for creating [[BlockClient]] by using createClient.
    + *
    + * The factory maintains a connection pool to other hosts and should return the same [[BlockClient]]
    + * for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
    + */
    +private[netty]
    +class BlockClientFactory(val conf: NettyConfig) {
    +
    +  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
    +
    +  /** A thread factory so the threads are named (for debugging). */
    +  private[this] val threadFactory = Utils.namedThreadFactory("spark-netty-client")
    +
    +  /** Socket channel type, initialized by [[init]] depending ioMode. */
    +  private[this] var socketChannelClass: Class[_ <: Channel] = _
    +
    +  /** Thread pool shared by all clients. */
    +  private[this] var workerGroup: EventLoopGroup = _
    +
    +  private[this] val connectionPool = new ConcurrentHashMap[(String, Int), BlockClient]
    +
    +  // The encoders are stateless and can be shared among multiple clients.
    +  private[this] val encoder = new ClientRequestEncoder
    +  private[this] val decoder = new ServerResponseDecoder
    +
    +  init()
    +
    +  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
    +  private def init(): Unit = {
    +    def initOio(): Unit = {
    +      socketChannelClass = classOf[OioSocketChannel]
    +      workerGroup = new OioEventLoopGroup(0, threadFactory)
    +    }
    +    def initNio(): Unit = {
    +      socketChannelClass = classOf[NioSocketChannel]
    +      workerGroup = new NioEventLoopGroup(0, threadFactory)
    +    }
    +    def initEpoll(): Unit = {
    +      socketChannelClass = classOf[EpollSocketChannel]
    +      workerGroup = new EpollEventLoopGroup(0, threadFactory)
    +    }
    +
    +    // For auto mode, first try epoll (only available on Linux), then nio.
    +    conf.ioMode match {
    +      case "nio" => initNio()
    +      case "oio" => initOio()
    +      case "epoll" => initEpoll()
    +      case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
    +    }
    +  }
    +
    +  /**
    +   * Create a new BlockFetchingClient connecting to the given remote host / port.
    +   *
    +   * This blocks until a connection is successfully established.
    +   *
    +   * Concurrency: This method is safe to call from multiple threads.
    +   */
    +  def createClient(remoteHost: String, remotePort: Int): BlockClient = {
    +    // Get connection from the connection pool first.
    +    // If it is not found or not active, create a new one.
    +    val cachedClient = connectionPool.get((remoteHost, remotePort))
    --- End diff --
    
    This is SPARK-3002


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57223142
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20994/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57272680
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21025/consoleFull) for   PR 2330 at commit [`0dae310`](https://github.com/apache/spark/commit/0dae31022fa26abc806db94e02fa7c15a031d1c1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57242521
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21003/consoleFull) for   PR 2330 at commit [`69f5d0a`](https://github.com/apache/spark/commit/69f5d0a2434396abbbd98886e047bc08a9e65565).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18264123
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -34,11 +35,17 @@ import org.apache.spark.util.{ByteBufferInputStream, Utils}
      * This interface provides an immutable view for data in the form of bytes. The implementation
      * should specify how the data is provided:
      *
    - * - FileSegmentManagedBuffer: data backed by part of a file
    - * - NioByteBufferManagedBuffer: data backed by a NIO ByteBuffer
    - * - NettyByteBufManagedBuffer: data backed by a Netty ByteBuf
    + * - [[FileSegmentManagedBuffer]]: data backed by part of a file
    + * - [[NioManagedBuffer]]: data backed by a NIO ByteBuffer
    + * - [[NettyManagedBuffer]]: data backed by a Netty ByteBuf
    + *
    + * The concrete buffer implementation might be managed outside the JVM garbage collector.
    + * For example, in the case of [[NettyManagedBuffer]], the buffers are reference counted.
    + * In that case, if the buffer is going to be passed around to a different thread, retain/release
    --- End diff --
    
    The comment here suggests that you should only have to retain if your buffer is a NettyManagedBuffer, which isn't quite true, as you could have an NioManagedBuffer whose ByteBuffer underlies a Netty ByteBuf, in which case it is never safe to pass around.
    
    I'm just a little worried about sanitary buffer usage, as misuse of this API from not copying could lead to nondeterministic data corruption.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55351771
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20191/consoleFull) for   PR 2330 at commit [`55266d1`](https://github.com/apache/spark/commit/55266d1f7f9e2bf95b310a1d4d603c0df9e7b996).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55216326
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/54/consoleFull) for   PR 2330 at commit [`b32c3fe`](https://github.com/apache/spark/commit/b32c3fe568163614a6eb424e523f7dd545d8ce9e).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57242368
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/199/consoleFull) for   PR 2330 at commit [`3fbfd3f`](https://github.com/apache/spark/commit/3fbfd3f7a49600bdd867f3e363d3e858b04c1cf5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55219255
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20138/consoleFull) for   PR 2330 at commit [`6e84cb2`](https://github.com/apache/spark/commit/6e84cb22460660880c557b6a0d11945c0bbeee0e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-54940550
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20029/consoleFull) for   PR 2330 at commit [`9b3b397`](https://github.com/apache/spark/commit/9b3b3973af78d5bcc680f46f8f162fb3d4bd69f8).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55048008
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20053/consoleFull) for   PR 2330 at commit [`dd783ff`](https://github.com/apache/spark/commit/dd783ffb35d227aab301387edce2af38ca4f947b).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17463602
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/protocol.scala ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.{List => JList}
    +
    +import io.netty.buffer.ByteBuf
    +import io.netty.channel.ChannelHandlerContext
    +import io.netty.channel.ChannelHandler.Sharable
    +import io.netty.handler.codec._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
    +
    +
    +/** Messages from the client to the server. */
    +sealed trait ClientRequest {
    +  def id: Byte
    +}
    +
    +/**
    + * Request to fetch a sequence of blocks from the server. A single [[BlockFetchRequest]] can
    + * correspond to multiple [[ServerResponse]]s.
    + */
    +final case class BlockFetchRequest(blocks: Seq[String]) extends ClientRequest {
    +  override def id = 0
    +}
    +
    +/**
    + * Request to upload a block to the server. Currently the server does not ack the upload request.
    + */
    +final case class BlockUploadRequest(blockId: String, data: ManagedBuffer) extends ClientRequest {
    --- End diff --
    
    ah, seem's the 0 or 1 is message id (or id in this class), sorries


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55077449
  
    @colorant you probably would be interested in this ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57247111
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/199/consoleFull) for   PR 2330 at commit [`3fbfd3f`](https://github.com/apache/spark/commit/3fbfd3f7a49600bdd867f3e363d3e858b04c1cf5).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-62344803
  
    This patch is in now so let's close this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18214243
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockServer.scala ---
    @@ -121,18 +83,18 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
           bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, backLog)
         }
         conf.receiveBuf.foreach { receiveBuf =>
    -      bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
    +      bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
         }
         conf.sendBuf.foreach { sendBuf =>
    -      bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
    +      bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
    --- End diff --
    
    I don't know if it's on purpose, but both send and receive buffer size options in `NettyConfig` use the same config key `spark.shuffle.io.sendBuffer`, so you might set both at the same time here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-54935715
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20029/consoleFull) for   PR 2330 at commit [`9b3b397`](https://github.com/apache/spark/commit/9b3b3973af78d5bcc680f46f8f162fb3d4bd69f8).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57247493
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21003/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57220735
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20988/consoleFull) for   PR 2330 at commit [`ca88068`](https://github.com/apache/spark/commit/ca88068553a01d8e9c4ee9bd8dd519775a527787).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57247486
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21003/consoleFull) for   PR 2330 at commit [`69f5d0a`](https://github.com/apache/spark/commit/69f5d0a2434396abbbd98886e047bc08a9e65565).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57370658
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21057/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17652889
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockServer.scala ---
    @@ -121,18 +91,18 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
           bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, backLog)
         }
         conf.receiveBuf.foreach { receiveBuf =>
    -      bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
    +      bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
         }
         conf.sendBuf.foreach { sendBuf =>
    -      bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
    +      bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
         }
     
         bootstrap.childHandler(new ChannelInitializer[SocketChannel] {
           override def initChannel(ch: SocketChannel): Unit = {
             ch.pipeline
    -          .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
    -          .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
    -          .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
    +          .addLast("frameDecoder", ProtocolUtils.createFrameDecoder())
    +          .addLast("clientRequestDecoder", new ClientRequestDecoder)
    +          .addLast("serverResponseEncoder", new ServerResponseEncoder)
               .addLast("handler", new BlockServerHandler(dataProvider))
    --- End diff --
    
    should this handler run on separate EventLoopGroup? since GetBlockData might block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18240775
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockServer.scala ---
    @@ -74,42 +54,24 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
       /** Initialize the server. */
       private def init(): Unit = {
         bootstrap = new ServerBootstrap
    -    val bossThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-boss")
    -    val workerThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-worker")
    +    val threadFactory = Utils.namedThreadFactory("spark-netty-server")
     
         // Use only one thread to accept connections, and 2 * num_cores for worker.
         def initNio(): Unit = {
    -      val bossGroup = new NioEventLoopGroup(1, bossThreadFactory)
    -      val workerGroup = new NioEventLoopGroup(0, workerThreadFactory)
    -      workerGroup.setIoRatio(conf.ioRatio)
    +      val bossGroup = new NioEventLoopGroup(conf.serverThreads, threadFactory)
    +      val workerGroup = bossGroup
           bootstrap.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
         }
    -    def initOio(): Unit = {
    -      val bossGroup = new OioEventLoopGroup(1, bossThreadFactory)
    -      val workerGroup = new OioEventLoopGroup(0, workerThreadFactory)
    -      bootstrap.group(bossGroup, workerGroup).channel(classOf[OioServerSocketChannel])
    -    }
         def initEpoll(): Unit = {
    -      val bossGroup = new EpollEventLoopGroup(1, bossThreadFactory)
    -      val workerGroup = new EpollEventLoopGroup(0, workerThreadFactory)
    -      workerGroup.setIoRatio(conf.ioRatio)
    +      val bossGroup = new EpollEventLoopGroup(conf.serverThreads, threadFactory)
    +      val workerGroup = bossGroup
    --- End diff --
    
    Good point. And yes too, very likely we might separate them in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17502935
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -185,26 +223,34 @@ final class ShuffleBlockFetcherIterator(
         remoteRequests
       }
     
    +  /**
    +   * Fetch the local blocks while we are fetching remote blocks. This is ok because
    +   * [[ManagedBuffer]]'s memory is allocated lazily when we create the input stream, so all we
    +   * track in-memory are the ManagedBuffer references themselves.
    +   */
       private[this] def fetchLocalBlocks() {
    -    // Get the local blocks while remote blocks are being fetched. Note that it's okay to do
    -    // these all at once because they will just memory-map some files, so they won't consume
    -    // any memory that might exceed our maxBytesInFlight
    -    for (id <- localBlocks) {
    +    val iter = localBlocks.iterator
    +    while (iter.hasNext) {
    +      val blockId = iter.next()
           try {
    +        val buf = blockManager.getBlockData(blockId.toString)
             shuffleMetrics.localBlocksFetched += 1
    -        results.put(new FetchResult(
    -          id, 0, () => blockManager.getLocalShuffleFromDisk(id, serializer).get))
    -        logDebug("Got local block " + id)
    --- End diff --
    
    yup removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17650983
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.io.Closeable
    +import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
    +
    +import io.netty.bootstrap.Bootstrap
    +import io.netty.buffer.PooledByteBufAllocator
    +import io.netty.channel._
    +import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
    +import io.netty.channel.nio.NioEventLoopGroup
    +import io.netty.channel.oio.OioEventLoopGroup
    +import io.netty.channel.socket.SocketChannel
    +import io.netty.channel.socket.nio.NioSocketChannel
    +import io.netty.channel.socket.oio.OioSocketChannel
    +import io.netty.util.internal.PlatformDependent
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * Factory for creating [[BlockClient]] by using createClient.
    + *
    + * The factory maintains a connection pool to other hosts and should return the same [[BlockClient]]
    + * for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
    + */
    +private[netty]
    +class BlockClientFactory(val conf: NettyConfig) extends Logging with Closeable {
    +
    +  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
    +
    +  /** A thread factory so the threads are named (for debugging). */
    +  private[this] val threadFactory = Utils.namedThreadFactory("spark-netty-client")
    +
    +  /** Socket channel type, initialized by [[init]] depending ioMode. */
    +  private[this] var socketChannelClass: Class[_ <: Channel] = _
    +
    +  /** Thread pool shared by all clients. */
    +  private[this] var workerGroup: EventLoopGroup = _
    +
    +  private[this] val connectionPool = new ConcurrentHashMap[(String, Int), BlockClient]
    +
    +  // The encoders are stateless and can be shared among multiple clients.
    +  private[this] val encoder = new ClientRequestEncoder
    +  private[this] val decoder = new ServerResponseDecoder
    +
    +  init()
    +
    +  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
    +  private def init(): Unit = {
    +    def initOio(): Unit = {
    +      socketChannelClass = classOf[OioSocketChannel]
    +      workerGroup = new OioEventLoopGroup(0, threadFactory)
    +    }
    +    def initNio(): Unit = {
    +      socketChannelClass = classOf[NioSocketChannel]
    +      workerGroup = new NioEventLoopGroup(0, threadFactory)
    +    }
    +    def initEpoll(): Unit = {
    +      socketChannelClass = classOf[EpollSocketChannel]
    +      workerGroup = new EpollEventLoopGroup(0, threadFactory)
    +    }
    +
    +    // For auto mode, first try epoll (only available on Linux), then nio.
    +    conf.ioMode match {
    +      case "nio" => initNio()
    +      case "oio" => initOio()
    +      case "epoll" => initEpoll()
    +      case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
    +    }
    +  }
    +
    +  /**
    +   * Create a new BlockFetchingClient connecting to the given remote host / port.
    +   *
    +   * This blocks until a connection is successfully established.
    +   *
    +   * Concurrency: This method is safe to call from multiple threads.
    +   */
    +  def createClient(remoteHost: String, remotePort: Int): BlockClient = {
    +    // Get connection from the connection pool first.
    +    // If it is not found or not active, create a new one.
    +    val cachedClient = connectionPool.get((remoteHost, remotePort))
    +    if (cachedClient != null && cachedClient.isActive) {
    +      return cachedClient
    +    }
    +
    +    logInfo(s"Creating new connection to $remoteHost:$remotePort")
    +
    +    // There is a chance two threads are creating two different clients connecting to the same host.
    +    // But that's probably ok ...
    +
    +    val handler = new BlockClientHandler
    --- End diff --
    
    i don't think so -- it is stateful? also it is tricky to handle connection failure in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18421303
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClient.scala ---
    @@ -0,0 +1,125 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.io.Closeable
    +import java.util.concurrent.TimeoutException
    +
    +import scala.concurrent.{Future, promise}
    +
    +import io.netty.channel.{ChannelFuture, ChannelFutureListener}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener}
    +import org.apache.spark.storage.StorageLevel
    +
    +
    +/**
    + * Client for [[NettyBlockTransferService]]. The connection to server must have been established
    + * using [[BlockClientFactory]] before instantiating this.
    + *
    + * This class is used to make requests to the server , while [[BlockClientHandler]] is responsible
    + * for handling responses from the server.
    + *
    + * Concurrency: thread safe and can be called from multiple threads.
    + *
    + * @param cf the ChannelFuture for the connection.
    + * @param handler [[BlockClientHandler]] for handling outstanding requests.
    + */
    +@throws[TimeoutException]
    +private[netty]
    +class BlockClient(cf: ChannelFuture, handler: BlockClientHandler) extends Closeable with Logging {
    +
    +  private[this] val serverAddr = cf.channel().remoteAddress().toString
    --- End diff --
    
    If the channel is not connected, remoteAddress() may return null. We should probably avoid an NPE here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57259108
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/203/consoleFull) for   PR 2330 at commit [`bc9ed22`](https://github.com/apache/spark/commit/bc9ed22d4d9fd36599a076a0a201a1809ea3a24c).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55369434
  
    **[Tests timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20206/consoleFull)** after     a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18263398
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockDataManager.scala ---
    @@ -20,14 +20,14 @@ package org.apache.spark.network
     import org.apache.spark.storage.StorageLevel
     
     
    +private[spark]
     trait BlockDataManager {
    --- End diff --
    
    Did you add this class at some point? Either way, would you mind adding a class comment? It's not clear how it differs from all the other block-related managers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18263375
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -234,7 +236,12 @@ object SparkEnv extends Logging {
     
         val shuffleMemoryManager = new ShuffleMemoryManager(conf)
     
    -    val blockTransferService = new NioBlockTransferService(conf, securityManager)
    +    // TODO(rxin): Config option based on class name, similar to shuffle mgr and compression codec.
    +    val blockTransferService = if (conf.getBoolean("spark.shuffle.use.netty", false)) {
    --- End diff --
    
    It'd be better if we could go ahead and make the config stable, it's always painful for people to update later. Even if it's just like
    
    ```scala
    val blockTransferService = conf.getString("spark.shuffle.transferService", "NIO") match {
      case "NETTY" => new NettyBlockTransferService(conf)
      case "NIO" => new NioBlockTransferService(conf, securityManager)
      case s => throw new UnsupportedOperationException("Unknown transfer service: " + s)
    }
    ```
    
    Potentially less pain for when we address the TODO at a later date.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17459366
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -185,26 +223,34 @@ final class ShuffleBlockFetcherIterator(
         remoteRequests
       }
     
    +  /**
    +   * Fetch the local blocks while we are fetching remote blocks. This is ok because
    +   * [[ManagedBuffer]]'s memory is allocated lazily when we create the input stream, so all we
    +   * track in-memory are the ManagedBuffer references themselves.
    +   */
       private[this] def fetchLocalBlocks() {
    -    // Get the local blocks while remote blocks are being fetched. Note that it's okay to do
    -    // these all at once because they will just memory-map some files, so they won't consume
    -    // any memory that might exceed our maxBytesInFlight
    -    for (id <- localBlocks) {
    +    val iter = localBlocks.iterator
    +    while (iter.hasNext) {
    +      val blockId = iter.next()
           try {
    +        val buf = blockManager.getBlockData(blockId.toString)
             shuffleMetrics.localBlocksFetched += 1
    -        results.put(new FetchResult(
    -          id, 0, () => blockManager.getLocalShuffleFromDisk(id, serializer).get))
    -        logDebug("Got local block " + id)
    --- End diff --
    
    Then, looks like that no one else is using blockManager.getLocalShuffleFromDisk, could be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55357993
  
    **[Tests timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20191/consoleFull)** after     a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55213214
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20131/consoleFull) for   PR 2330 at commit [`b32c3fe`](https://github.com/apache/spark/commit/b32c3fe568163614a6eb424e523f7dd545d8ce9e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55856201
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20451/consoleFull) for   PR 2330 at commit [`a79a259`](https://github.com/apache/spark/commit/a79a25918a96171c4b20c5c9153e5815bc23698e).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17463593
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/protocol.scala ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.{List => JList}
    +
    +import io.netty.buffer.ByteBuf
    +import io.netty.channel.ChannelHandlerContext
    +import io.netty.channel.ChannelHandler.Sharable
    +import io.netty.handler.codec._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
    +
    +
    +/** Messages from the client to the server. */
    +sealed trait ClientRequest {
    +  def id: Byte
    +}
    +
    +/**
    + * Request to fetch a sequence of blocks from the server. A single [[BlockFetchRequest]] can
    + * correspond to multiple [[ServerResponse]]s.
    + */
    +final case class BlockFetchRequest(blocks: Seq[String]) extends ClientRequest {
    +  override def id = 0
    +}
    +
    +/**
    + * Request to upload a block to the server. Currently the server does not ack the upload request.
    + */
    +final case class BlockUploadRequest(blockId: String, data: ManagedBuffer) extends ClientRequest {
    --- End diff --
    
    it's not blockId. it's id ... i should probably rename it messageTypeId ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55041582
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20053/consoleFull) for   PR 2330 at commit [`dd783ff`](https://github.com/apache/spark/commit/dd783ffb35d227aab301387edce2af38ca4f947b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57259323
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21010/consoleFull) for   PR 2330 at commit [`bc9ed22`](https://github.com/apache/spark/commit/bc9ed22d4d9fd36599a076a0a201a1809ea3a24c).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57224233
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/193/consoleFull) for   PR 2330 at commit [`3fbfd3f`](https://github.com/apache/spark/commit/3fbfd3f7a49600bdd867f3e363d3e858b04c1cf5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57276584
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21025/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55371346
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20215/consoleFull) for   PR 2330 at commit [`8295561`](https://github.com/apache/spark/commit/8295561a9befcfa3c2a56d8836e05a42ce4ab6b0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57279534
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21032/consoleFull) for   PR 2330 at commit [`ad09236`](https://github.com/apache/spark/commit/ad092361f649b82dff64c44a30b50af1e9cccc0c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55015796
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20047/consoleFull) for   PR 2330 at commit [`9b3b397`](https://github.com/apache/spark/commit/9b3b3973af78d5bcc680f46f8f162fb3d4bd69f8).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55854376
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20452/consoleFull) for   PR 2330 at commit [`088ed8a`](https://github.com/apache/spark/commit/088ed8ac46bc59bf3d13d4a0be1d4c616a22d698).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57259035
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/203/consoleFull) for   PR 2330 at commit [`bc9ed22`](https://github.com/apache/spark/commit/bc9ed22d4d9fd36599a076a0a201a1809ea3a24c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55376546
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20215/consoleFull) for   PR 2330 at commit [`8295561`](https://github.com/apache/spark/commit/8295561a9befcfa3c2a56d8836e05a42ce4ab6b0).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55858115
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20452/consoleFull) for   PR 2330 at commit [`088ed8a`](https://github.com/apache/spark/commit/088ed8ac46bc59bf3d13d4a0be1d4c616a22d698).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57381585
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/216/consoleFull) for   PR 2330 at commit [`bdab2c7`](https://github.com/apache/spark/commit/bdab2c74111c8bce382323f68732f87ca9b080a9).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55362560
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20206/consoleFull) for   PR 2330 at commit [`6ddaa5d`](https://github.com/apache/spark/commit/6ddaa5d6fc893a759be81347a401296eef8c566c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57286813
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21032/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57231963
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/193/consoleFull) for   PR 2330 at commit [`3fbfd3f`](https://github.com/apache/spark/commit/3fbfd3f7a49600bdd867f3e363d3e858b04c1cf5).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55222920
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20138/consoleFull) for   PR 2330 at commit [`6e84cb2`](https://github.com/apache/spark/commit/6e84cb22460660880c557b6a0d11945c0bbeee0e).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17463503
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
    +
    +import io.netty.bootstrap.Bootstrap
    +import io.netty.buffer.PooledByteBufAllocator
    +import io.netty.channel._
    +import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
    +import io.netty.channel.nio.NioEventLoopGroup
    +import io.netty.channel.oio.OioEventLoopGroup
    +import io.netty.channel.socket.SocketChannel
    +import io.netty.channel.socket.nio.NioSocketChannel
    +import io.netty.channel.socket.oio.OioSocketChannel
    +import io.netty.util.internal.PlatformDependent
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * Factory for creating [[BlockClient]] by using createClient.
    + *
    + * The factory maintains a connection pool to other hosts and should return the same [[BlockClient]]
    + * for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
    + */
    +private[netty]
    +class BlockClientFactory(val conf: NettyConfig) {
    +
    +  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
    +
    +  /** A thread factory so the threads are named (for debugging). */
    +  private[this] val threadFactory = Utils.namedThreadFactory("spark-netty-client")
    +
    +  /** Socket channel type, initialized by [[init]] depending ioMode. */
    +  private[this] var socketChannelClass: Class[_ <: Channel] = _
    +
    +  /** Thread pool shared by all clients. */
    +  private[this] var workerGroup: EventLoopGroup = _
    +
    +  private[this] val connectionPool = new ConcurrentHashMap[(String, Int), BlockClient]
    +
    +  // The encoders are stateless and can be shared among multiple clients.
    +  private[this] val encoder = new ClientRequestEncoder
    +  private[this] val decoder = new ServerResponseDecoder
    +
    +  init()
    +
    +  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
    +  private def init(): Unit = {
    +    def initOio(): Unit = {
    +      socketChannelClass = classOf[OioSocketChannel]
    +      workerGroup = new OioEventLoopGroup(0, threadFactory)
    +    }
    +    def initNio(): Unit = {
    +      socketChannelClass = classOf[NioSocketChannel]
    +      workerGroup = new NioEventLoopGroup(0, threadFactory)
    +    }
    +    def initEpoll(): Unit = {
    +      socketChannelClass = classOf[EpollSocketChannel]
    +      workerGroup = new EpollEventLoopGroup(0, threadFactory)
    +    }
    +
    +    // For auto mode, first try epoll (only available on Linux), then nio.
    +    conf.ioMode match {
    +      case "nio" => initNio()
    +      case "oio" => initOio()
    +      case "epoll" => initEpoll()
    +      case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
    +    }
    +  }
    +
    +  /**
    +   * Create a new BlockFetchingClient connecting to the given remote host / port.
    +   *
    +   * This blocks until a connection is successfully established.
    +   *
    +   * Concurrency: This method is safe to call from multiple threads.
    +   */
    +  def createClient(remoteHost: String, remotePort: Int): BlockClient = {
    +    // Get connection from the connection pool first.
    +    // If it is not found or not active, create a new one.
    +    val cachedClient = connectionPool.get((remoteHost, remotePort))
    +    if (cachedClient != null && cachedClient.isActive) {
    +      return cachedClient
    +    }
    +
    +    // There is a chance two threads are creating two different clients connecting to the same host.
    +    // But that's probably ok ...
    +
    +    val handler = new BlockClientHandler
    +
    +    val bootstrap = new Bootstrap
    +    bootstrap.group(workerGroup)
    +      .channel(socketChannelClass)
    +      // Disable Nagle's Algorithm since we don't want packets to wait
    +      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
    +      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
    +      .option[Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs)
    +
    +    // Use pooled buffers to reduce temporary buffer allocation
    +    bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator())
    +
    +    bootstrap.handler(new ChannelInitializer[SocketChannel] {
    +      override def initChannel(ch: SocketChannel): Unit = {
    +        ch.pipeline
    +          .addLast("clientRequestEncoder", encoder)
    +          .addLast("frameDecoder", ProtocolUtils.createFrameDecoder())
    +          .addLast("serverResponseDecoder", decoder)
    +          .addLast("handler", handler)
    +      }
    +    })
    +
    +    // Connect to the remote server
    +    val cf: ChannelFuture = bootstrap.connect(remoteHost, remotePort)
    +    if (!cf.awaitUninterruptibly(conf.connectTimeoutMs)) {
    +      throw new TimeoutException(
    +        s"Connecting to $remoteHost:$remotePort timed out (${conf.connectTimeoutMs} ms)")
    +    }
    +
    +    val client = new BlockClient(cf, handler)
    +    connectionPool.put((remoteHost, remotePort), client)
    +    client
    +  }
    +
    +  /** Close all connections in the connection pool, and shutdown the worker thread pool. */
    +  def stop(): Unit = {
    +    val iter = connectionPool.entrySet().iterator()
    +    while (iter.hasNext) {
    +      val entry = iter.next()
    +      entry.getValue.close()
    +      connectionPool.remove(entry.getKey)
    +    }
    +
    +    if (workerGroup != null) {
    +      workerGroup.shutdownGracefully()
    +    }
    +  }
    +
    +  /**
    +   * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
    +   * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
    +   * executor thread rather than the event loop thread. Those thread-local caches actually delay
    +   * the recycling of buffers, leading to larger memory usage.
    +   */
    +  private def createPooledByteBufAllocator(): PooledByteBufAllocator = {
    --- End diff --
    
    This is SPARK-3503


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18121216
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.io.Closeable
    +import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
    +
    +import io.netty.bootstrap.Bootstrap
    +import io.netty.buffer.PooledByteBufAllocator
    +import io.netty.channel._
    +import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
    +import io.netty.channel.nio.NioEventLoopGroup
    +import io.netty.channel.oio.OioEventLoopGroup
    +import io.netty.channel.socket.SocketChannel
    +import io.netty.channel.socket.nio.NioSocketChannel
    +import io.netty.channel.socket.oio.OioSocketChannel
    +import io.netty.util.internal.PlatformDependent
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * Factory for creating [[BlockClient]] by using createClient.
    + *
    + * The factory maintains a connection pool to other hosts and should return the same [[BlockClient]]
    + * for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
    + */
    +private[netty]
    +class BlockClientFactory(val conf: NettyConfig) extends Logging with Closeable {
    +
    +  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
    +
    +  /** A thread factory so the threads are named (for debugging). */
    +  private[this] val threadFactory = Utils.namedThreadFactory("spark-netty-client")
    +
    +  /** Socket channel type, initialized by [[init]] depending ioMode. */
    +  private[this] var socketChannelClass: Class[_ <: Channel] = _
    +
    +  /** Thread pool shared by all clients. */
    +  private[this] var workerGroup: EventLoopGroup = _
    +
    +  private[this] val connectionPool = new ConcurrentHashMap[(String, Int), BlockClient]
    +
    +  // The encoders are stateless and can be shared among multiple clients.
    +  private[this] val encoder = new ClientRequestEncoder
    +  private[this] val decoder = new ServerResponseDecoder
    +
    +  init()
    +
    +  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
    +  private def init(): Unit = {
    +    def initOio(): Unit = {
    +      socketChannelClass = classOf[OioSocketChannel]
    +      workerGroup = new OioEventLoopGroup(0, threadFactory)
    +    }
    +    def initNio(): Unit = {
    +      socketChannelClass = classOf[NioSocketChannel]
    +      workerGroup = new NioEventLoopGroup(0, threadFactory)
    +    }
    +    def initEpoll(): Unit = {
    +      socketChannelClass = classOf[EpollSocketChannel]
    +      workerGroup = new EpollEventLoopGroup(0, threadFactory)
    +    }
    +
    +    // For auto mode, first try epoll (only available on Linux), then nio.
    +    conf.ioMode match {
    +      case "nio" => initNio()
    +      case "oio" => initOio()
    +      case "epoll" => initEpoll()
    +      case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
    +    }
    +  }
    +
    +  /**
    +   * Create a new BlockFetchingClient connecting to the given remote host / port.
    +   *
    +   * This blocks until a connection is successfully established.
    +   *
    +   * Concurrency: This method is safe to call from multiple threads.
    +   */
    +  def createClient(remoteHost: String, remotePort: Int): BlockClient = {
    +    // Get connection from the connection pool first.
    +    // If it is not found or not active, create a new one.
    +    val cachedClient = connectionPool.get((remoteHost, remotePort))
    +    if (cachedClient != null && cachedClient.isActive) {
    +      return cachedClient
    +    }
    +
    +    logInfo(s"Creating new connection to $remoteHost:$remotePort")
    +
    +    // There is a chance two threads are creating two different clients connecting to the same host.
    +    // But that's probably ok ...
    +
    +    val handler = new BlockClientHandler
    +
    +    val bootstrap = new Bootstrap
    +    bootstrap.group(workerGroup)
    +      .channel(socketChannelClass)
    +      // Disable Nagle's Algorithm since we don't want packets to wait
    +      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
    +      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
    +      .option[Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs)
    +
    +    // Use pooled buffers to reduce temporary buffer allocation
    +    bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator())
    +
    +    bootstrap.handler(new ChannelInitializer[SocketChannel] {
    +      override def initChannel(ch: SocketChannel): Unit = {
    +        ch.pipeline
    +          .addLast("clientRequestEncoder", encoder)
    +          .addLast("frameDecoder", ProtocolUtils.createFrameDecoder())
    +          .addLast("serverResponseDecoder", decoder)
    +          .addLast("handler", handler)
    --- End diff --
    
    So I tested this and with the current setup I can get close to 1GB/s per node. That's pretty good so I wouldn't worry about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57209924
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20986/consoleFull) for   PR 2330 at commit [`f23e682`](https://github.com/apache/spark/commit/f23e6821b250157adb0f2a44762972efacf78ef9).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17650877
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.io.Closeable
    +import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
    +
    +import io.netty.bootstrap.Bootstrap
    +import io.netty.buffer.PooledByteBufAllocator
    +import io.netty.channel._
    +import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
    +import io.netty.channel.nio.NioEventLoopGroup
    +import io.netty.channel.oio.OioEventLoopGroup
    +import io.netty.channel.socket.SocketChannel
    +import io.netty.channel.socket.nio.NioSocketChannel
    +import io.netty.channel.socket.oio.OioSocketChannel
    +import io.netty.util.internal.PlatformDependent
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * Factory for creating [[BlockClient]] by using createClient.
    + *
    + * The factory maintains a connection pool to other hosts and should return the same [[BlockClient]]
    + * for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
    + */
    +private[netty]
    +class BlockClientFactory(val conf: NettyConfig) extends Logging with Closeable {
    +
    +  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
    +
    +  /** A thread factory so the threads are named (for debugging). */
    +  private[this] val threadFactory = Utils.namedThreadFactory("spark-netty-client")
    +
    +  /** Socket channel type, initialized by [[init]] depending ioMode. */
    +  private[this] var socketChannelClass: Class[_ <: Channel] = _
    +
    +  /** Thread pool shared by all clients. */
    +  private[this] var workerGroup: EventLoopGroup = _
    +
    +  private[this] val connectionPool = new ConcurrentHashMap[(String, Int), BlockClient]
    +
    +  // The encoders are stateless and can be shared among multiple clients.
    +  private[this] val encoder = new ClientRequestEncoder
    +  private[this] val decoder = new ServerResponseDecoder
    +
    +  init()
    +
    +  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
    +  private def init(): Unit = {
    +    def initOio(): Unit = {
    +      socketChannelClass = classOf[OioSocketChannel]
    +      workerGroup = new OioEventLoopGroup(0, threadFactory)
    +    }
    +    def initNio(): Unit = {
    +      socketChannelClass = classOf[NioSocketChannel]
    +      workerGroup = new NioEventLoopGroup(0, threadFactory)
    +    }
    +    def initEpoll(): Unit = {
    +      socketChannelClass = classOf[EpollSocketChannel]
    +      workerGroup = new EpollEventLoopGroup(0, threadFactory)
    +    }
    +
    +    // For auto mode, first try epoll (only available on Linux), then nio.
    +    conf.ioMode match {
    +      case "nio" => initNio()
    +      case "oio" => initOio()
    +      case "epoll" => initEpoll()
    +      case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
    +    }
    +  }
    +
    +  /**
    +   * Create a new BlockFetchingClient connecting to the given remote host / port.
    +   *
    +   * This blocks until a connection is successfully established.
    +   *
    +   * Concurrency: This method is safe to call from multiple threads.
    +   */
    +  def createClient(remoteHost: String, remotePort: Int): BlockClient = {
    +    // Get connection from the connection pool first.
    +    // If it is not found or not active, create a new one.
    +    val cachedClient = connectionPool.get((remoteHost, remotePort))
    +    if (cachedClient != null && cachedClient.isActive) {
    +      return cachedClient
    +    }
    +
    +    logInfo(s"Creating new connection to $remoteHost:$remotePort")
    +
    +    // There is a chance two threads are creating two different clients connecting to the same host.
    +    // But that's probably ok ...
    +
    +    val handler = new BlockClientHandler
    --- End diff --
    
    Can this handler be shared across BlockClient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17653172
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientFactory.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.io.Closeable
    +import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
    +
    +import io.netty.bootstrap.Bootstrap
    +import io.netty.buffer.PooledByteBufAllocator
    +import io.netty.channel._
    +import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
    +import io.netty.channel.nio.NioEventLoopGroup
    +import io.netty.channel.oio.OioEventLoopGroup
    +import io.netty.channel.socket.SocketChannel
    +import io.netty.channel.socket.nio.NioSocketChannel
    +import io.netty.channel.socket.oio.OioSocketChannel
    +import io.netty.util.internal.PlatformDependent
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * Factory for creating [[BlockClient]] by using createClient.
    + *
    + * The factory maintains a connection pool to other hosts and should return the same [[BlockClient]]
    + * for the same remote host. It also shares a single worker thread pool for all [[BlockClient]]s.
    + */
    +private[netty]
    +class BlockClientFactory(val conf: NettyConfig) extends Logging with Closeable {
    +
    +  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
    +
    +  /** A thread factory so the threads are named (for debugging). */
    +  private[this] val threadFactory = Utils.namedThreadFactory("spark-netty-client")
    +
    +  /** Socket channel type, initialized by [[init]] depending ioMode. */
    +  private[this] var socketChannelClass: Class[_ <: Channel] = _
    +
    +  /** Thread pool shared by all clients. */
    +  private[this] var workerGroup: EventLoopGroup = _
    +
    +  private[this] val connectionPool = new ConcurrentHashMap[(String, Int), BlockClient]
    +
    +  // The encoders are stateless and can be shared among multiple clients.
    +  private[this] val encoder = new ClientRequestEncoder
    +  private[this] val decoder = new ServerResponseDecoder
    +
    +  init()
    +
    +  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
    +  private def init(): Unit = {
    +    def initOio(): Unit = {
    +      socketChannelClass = classOf[OioSocketChannel]
    +      workerGroup = new OioEventLoopGroup(0, threadFactory)
    +    }
    +    def initNio(): Unit = {
    +      socketChannelClass = classOf[NioSocketChannel]
    +      workerGroup = new NioEventLoopGroup(0, threadFactory)
    +    }
    +    def initEpoll(): Unit = {
    +      socketChannelClass = classOf[EpollSocketChannel]
    +      workerGroup = new EpollEventLoopGroup(0, threadFactory)
    +    }
    +
    +    // For auto mode, first try epoll (only available on Linux), then nio.
    +    conf.ioMode match {
    +      case "nio" => initNio()
    +      case "oio" => initOio()
    +      case "epoll" => initEpoll()
    +      case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
    +    }
    +  }
    +
    +  /**
    +   * Create a new BlockFetchingClient connecting to the given remote host / port.
    +   *
    +   * This blocks until a connection is successfully established.
    +   *
    +   * Concurrency: This method is safe to call from multiple threads.
    +   */
    +  def createClient(remoteHost: String, remotePort: Int): BlockClient = {
    +    // Get connection from the connection pool first.
    +    // If it is not found or not active, create a new one.
    +    val cachedClient = connectionPool.get((remoteHost, remotePort))
    +    if (cachedClient != null && cachedClient.isActive) {
    +      return cachedClient
    +    }
    +
    +    logInfo(s"Creating new connection to $remoteHost:$remotePort")
    +
    +    // There is a chance two threads are creating two different clients connecting to the same host.
    +    // But that's probably ok ...
    +
    +    val handler = new BlockClientHandler
    +
    +    val bootstrap = new Bootstrap
    +    bootstrap.group(workerGroup)
    +      .channel(socketChannelClass)
    +      // Disable Nagle's Algorithm since we don't want packets to wait
    +      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
    +      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
    +      .option[Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectTimeoutMs)
    +
    +    // Use pooled buffers to reduce temporary buffer allocation
    +    bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator())
    +
    +    bootstrap.handler(new ChannelInitializer[SocketChannel] {
    +      override def initChannel(ch: SocketChannel): Unit = {
    +        ch.pipeline
    +          .addLast("clientRequestEncoder", encoder)
    +          .addLast("frameDecoder", ProtocolUtils.createFrameDecoder())
    +          .addLast("serverResponseDecoder", decoder)
    +          .addLast("handler", handler)
    --- End diff --
    
    This  handler might be time consuming? thus might also need to be run in separate thread pool other than shared IO pool?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18240875
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockServer.scala ---
    @@ -121,18 +83,18 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
           bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, backLog)
         }
         conf.receiveBuf.foreach { receiveBuf =>
    -      bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
    +      bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
         }
         conf.sendBuf.foreach { sendBuf =>
    -      bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
    +      bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
    --- End diff --
    
    oops - thanks for catching that!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55004934
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57276579
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21025/consoleFull) for   PR 2330 at commit [`0dae310`](https://github.com/apache/spark/commit/0dae31022fa26abc806db94e02fa7c15a031d1c1).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55008576
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20047/consoleFull) for   PR 2330 at commit [`9b3b397`](https://github.com/apache/spark/commit/9b3b3973af78d5bcc680f46f8f162fb3d4bd69f8).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57222690
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20990/consoleFull) for   PR 2330 at commit [`dfc2c34`](https://github.com/apache/spark/commit/dfc2c34b8951f0b3c3256b6ed744d1d8ece78583).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55472666
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/82/consoleFull) for   PR 2330 at commit [`8295561`](https://github.com/apache/spark/commit/8295561a9befcfa3c2a56d8836e05a42ce4ab6b0).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17460170
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/protocol.scala ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.{List => JList}
    +
    +import io.netty.buffer.ByteBuf
    +import io.netty.channel.ChannelHandlerContext
    +import io.netty.channel.ChannelHandler.Sharable
    +import io.netty.handler.codec._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
    +
    +
    +/** Messages from the client to the server. */
    +sealed trait ClientRequest {
    +  def id: Byte
    +}
    +
    +/**
    + * Request to fetch a sequence of blocks from the server. A single [[BlockFetchRequest]] can
    + * correspond to multiple [[ServerResponse]]s.
    + */
    +final case class BlockFetchRequest(blocks: Seq[String]) extends ClientRequest {
    +  override def id = 0
    +}
    +
    +/**
    + * Request to upload a block to the server. Currently the server does not ack the upload request.
    + */
    +final case class BlockUploadRequest(blockId: String, data: ManagedBuffer) extends ClientRequest {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 1
    +}
    +
    +
    +/** Messages from server to client (usually in response to some [[ClientRequest]]. */
    +sealed trait ServerResponse {
    +  def id: Byte
    +}
    +
    +/** Response to [[BlockFetchRequest]] when a block exists and has been successfully fetched. */
    +final case class BlockFetchSuccess(blockId: String, data: ManagedBuffer) extends ServerResponse {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 0
    +}
    +
    +/** Response to [[BlockFetchRequest]] when there is an error fetching the block. */
    +final case class BlockFetchFailure(blockId: String, error: String) extends ServerResponse {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 1
    +}
    +
    +
    +/**
    + * Encoder for [[ClientRequest]] used in client side.
    + *
    + * This encoder is stateless so it is safe to be shared by multiple threads.
    + */
    +@Sharable
    +final class ClientRequestEncoder extends MessageToMessageEncoder[ClientRequest] {
    +  override def encode(ctx: ChannelHandlerContext, in: ClientRequest, out: JList[Object]): Unit = {
    +    in match {
    +      case BlockFetchRequest(blocks) =>
    +        // 8 bytes: frame size
    +        // 1 byte: BlockFetchRequest vs BlockUploadRequest
    +        // 4 byte: num blocks
    +        // then for each block id write 1 byte for blockId.length and then blockId itself
    +        val frameLength = 8 + 1 + 4 + blocks.size + blocks.map(_.size).fold(0)(_ + _)
    +        val buf = ctx.alloc().buffer(frameLength)
    +
    +        buf.writeLong(frameLength)
    +        buf.writeByte(in.id)
    +        buf.writeInt(blocks.size)
    +        blocks.foreach { blockId =>
    +          ProtocolUtils.writeBlockId(buf, blockId)
    +        }
    +
    +        assert(buf.writableBytes() == 0)
    +        out.add(buf)
    +
    +      case BlockUploadRequest(blockId, data) =>
    +        // 8 bytes: frame size
    +        // 1 byte: msg id (BlockFetchRequest vs BlockUploadRequest)
    +        // 1 byte: blockId.length
    +        // data itself (length can be derived from: frame size - 1 - blockId.length)
    +        val headerLength = 8 + 1 + 1 + blockId.length
    +        val frameLength = headerLength + data.size
    +        val header = ctx.alloc().buffer(headerLength)
    +
    +        // Call this before we add header to out so in case of exceptions
    +        // we don't send anything at all.
    +        val body = data.convertToNetty()
    +
    +        header.writeLong(frameLength)
    +        header.writeByte(in.id)
    +        ProtocolUtils.writeBlockId(header, blockId)
    +
    +        assert(header.writableBytes() == 0)
    +        out.add(header)
    +        out.add(body)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * Decoder in the server side to decode client requests.
    + * This decoder is stateless so it is safe to be shared by multiple threads.
    + *
    + * This assumes the inbound messages have been processed by a frame decoder created by
    + * [[ProtocolUtils.createFrameDecoder()]].
    + */
    +@Sharable
    +final class ClientRequestDecoder extends MessageToMessageDecoder[ByteBuf] {
    +  override protected def decode(ctx: ChannelHandlerContext, in: ByteBuf, out: JList[AnyRef]): Unit =
    +  {
    +    val msgTypeId = in.readByte()
    +    val decoded = msgTypeId match {
    +      case 0 =>  // BlockFetchRequest
    +        val numBlocks = in.readInt()
    +        val blockIds = Seq.fill(numBlocks) { ProtocolUtils.readBlockId(in) }
    +        BlockFetchRequest(blockIds)
    +
    +      case 1 =>  // BlockUploadRequest
    +        val blockId = ProtocolUtils.readBlockId(in)
    +        in.retain()  // retain the bytebuf so we don't recycle it immediately.
    +        BlockUploadRequest(blockId, new NettyManagedBuffer(in))
    +    }
    +
    +    assert(decoded.id == msgTypeId)
    +    out.add(decoded)
    +  }
    +}
    +
    +
    +/**
    + * Encoder used by the server side to encode server-to-client responses.
    + * This encoder is stateless so it is safe to be shared by multiple threads.
    + */
    +@Sharable
    +final class ServerResponseEncoder extends MessageToMessageEncoder[ServerResponse] with Logging {
    +  override def encode(ctx: ChannelHandlerContext, in: ServerResponse, out: JList[Object]): Unit = {
    +    in match {
    +      case BlockFetchSuccess(blockId, data) =>
    +        // Handle the body first so if we encounter an error getting the body, we can respond
    +        // with an error instead.
    +        var body: AnyRef = null
    +        try {
    +          body = data.convertToNetty()
    --- End diff --
    
    do we need to call data.retain() here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18263556
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
    @@ -83,7 +84,7 @@ abstract class BlockTransferService {
         val lock = new Object
         @volatile var result: Either[ManagedBuffer, Throwable] = null
    --- End diff --
    
    Out of curiosity, is there an advantage of this over `Try[ManagedBuffer]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55851944
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20451/consoleFull) for   PR 2330 at commit [`a79a259`](https://github.com/apache/spark/commit/a79a25918a96171c4b20c5c9153e5815bc23698e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17463888
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/protocol.scala ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.{List => JList}
    +
    +import io.netty.buffer.ByteBuf
    +import io.netty.channel.ChannelHandlerContext
    +import io.netty.channel.ChannelHandler.Sharable
    +import io.netty.handler.codec._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
    +
    +
    +/** Messages from the client to the server. */
    +sealed trait ClientRequest {
    +  def id: Byte
    +}
    +
    +/**
    + * Request to fetch a sequence of blocks from the server. A single [[BlockFetchRequest]] can
    + * correspond to multiple [[ServerResponse]]s.
    + */
    +final case class BlockFetchRequest(blocks: Seq[String]) extends ClientRequest {
    +  override def id = 0
    +}
    +
    +/**
    + * Request to upload a block to the server. Currently the server does not ack the upload request.
    + */
    +final case class BlockUploadRequest(blockId: String, data: ManagedBuffer) extends ClientRequest {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 1
    +}
    +
    +
    +/** Messages from server to client (usually in response to some [[ClientRequest]]. */
    +sealed trait ServerResponse {
    +  def id: Byte
    +}
    +
    +/** Response to [[BlockFetchRequest]] when a block exists and has been successfully fetched. */
    +final case class BlockFetchSuccess(blockId: String, data: ManagedBuffer) extends ServerResponse {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 0
    +}
    +
    +/** Response to [[BlockFetchRequest]] when there is an error fetching the block. */
    +final case class BlockFetchFailure(blockId: String, error: String) extends ServerResponse {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 1
    +}
    +
    +
    +/**
    + * Encoder for [[ClientRequest]] used in client side.
    + *
    + * This encoder is stateless so it is safe to be shared by multiple threads.
    + */
    +@Sharable
    +final class ClientRequestEncoder extends MessageToMessageEncoder[ClientRequest] {
    +  override def encode(ctx: ChannelHandlerContext, in: ClientRequest, out: JList[Object]): Unit = {
    +    in match {
    +      case BlockFetchRequest(blocks) =>
    +        // 8 bytes: frame size
    +        // 1 byte: BlockFetchRequest vs BlockUploadRequest
    +        // 4 byte: num blocks
    +        // then for each block id write 1 byte for blockId.length and then blockId itself
    +        val frameLength = 8 + 1 + 4 + blocks.size + blocks.map(_.size).fold(0)(_ + _)
    +        val buf = ctx.alloc().buffer(frameLength)
    +
    +        buf.writeLong(frameLength)
    +        buf.writeByte(in.id)
    +        buf.writeInt(blocks.size)
    +        blocks.foreach { blockId =>
    +          ProtocolUtils.writeBlockId(buf, blockId)
    +        }
    +
    +        assert(buf.writableBytes() == 0)
    +        out.add(buf)
    +
    +      case BlockUploadRequest(blockId, data) =>
    +        // 8 bytes: frame size
    +        // 1 byte: msg id (BlockFetchRequest vs BlockUploadRequest)
    +        // 1 byte: blockId.length
    +        // data itself (length can be derived from: frame size - 1 - blockId.length)
    +        val headerLength = 8 + 1 + 1 + blockId.length
    +        val frameLength = headerLength + data.size
    +        val header = ctx.alloc().buffer(headerLength)
    +
    +        // Call this before we add header to out so in case of exceptions
    +        // we don't send anything at all.
    +        val body = data.convertToNetty()
    +
    +        header.writeLong(frameLength)
    +        header.writeByte(in.id)
    +        ProtocolUtils.writeBlockId(header, blockId)
    +
    +        assert(header.writableBytes() == 0)
    +        out.add(header)
    +        out.add(body)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * Decoder in the server side to decode client requests.
    + * This decoder is stateless so it is safe to be shared by multiple threads.
    + *
    + * This assumes the inbound messages have been processed by a frame decoder created by
    + * [[ProtocolUtils.createFrameDecoder()]].
    + */
    +@Sharable
    +final class ClientRequestDecoder extends MessageToMessageDecoder[ByteBuf] {
    +  override protected def decode(ctx: ChannelHandlerContext, in: ByteBuf, out: JList[AnyRef]): Unit =
    +  {
    +    val msgTypeId = in.readByte()
    +    val decoded = msgTypeId match {
    +      case 0 =>  // BlockFetchRequest
    +        val numBlocks = in.readInt()
    +        val blockIds = Seq.fill(numBlocks) { ProtocolUtils.readBlockId(in) }
    +        BlockFetchRequest(blockIds)
    +
    +      case 1 =>  // BlockUploadRequest
    +        val blockId = ProtocolUtils.readBlockId(in)
    +        in.retain()  // retain the bytebuf so we don't recycle it immediately.
    +        BlockUploadRequest(blockId, new NettyManagedBuffer(in))
    +    }
    +
    +    assert(decoded.id == msgTypeId)
    +    out.add(decoded)
    +  }
    +}
    +
    +
    +/**
    + * Encoder used by the server side to encode server-to-client responses.
    + * This encoder is stateless so it is safe to be shared by multiple threads.
    + */
    +@Sharable
    +final class ServerResponseEncoder extends MessageToMessageEncoder[ServerResponse] with Logging {
    +  override def encode(ctx: ChannelHandlerContext, in: ServerResponse, out: JList[Object]): Unit = {
    +    in match {
    +      case BlockFetchSuccess(blockId, data) =>
    +        // Handle the body first so if we encounter an error getting the body, we can respond
    +        // with an error instead.
    +        var body: AnyRef = null
    +        try {
    +          body = data.convertToNetty()
    +        } catch {
    +          case e: Exception =>
    +            // Re-encode this message as BlockFetchFailure.
    +            logError(s"Error opening block $blockId for client ${ctx.channel.remoteAddress}", e)
    +            encode(ctx, new BlockFetchFailure(blockId, e.getMessage), out)
    +            return
    +        }
    +
    +        // If we got here, body cannot be null
    +        // 8 bytes = long for frame length
    +        // 1 byte = message id (type)
    +        // 1 byte = block id length
    +        // followed by block id itself
    +        val headerLength = 8 + 1 + 1 + blockId.length
    +        val frameLength = headerLength + data.size
    +        val header = ctx.alloc().buffer(headerLength)
    +        header.writeLong(frameLength)
    +        header.writeByte(in.id)
    +        ProtocolUtils.writeBlockId(header, blockId)
    +
    +        assert(header.writableBytes() == 0)
    +        out.add(header)
    +        out.add(body)
    +
    +      case BlockFetchFailure(blockId, error) =>
    +        val frameLength = 8 + 1 + 1 + blockId.length + error.length
    +        val buf = ctx.alloc().buffer(frameLength)
    +        buf.writeLong(frameLength)
    +        buf.writeByte(in.id)
    +        ProtocolUtils.writeBlockId(buf, blockId)
    +        buf.writeBytes(error.getBytes)
    +
    +        assert(buf.writableBytes() == 0)
    +        out.add(buf)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * Decoder in the client side to decode server responses.
    + * This decoder is stateless so it is safe to be shared by multiple threads.
    + *
    + * This assumes the inbound messages have been processed by a frame decoder created by
    + * [[ProtocolUtils.createFrameDecoder()]].
    + */
    +@Sharable
    +final class ServerResponseDecoder extends MessageToMessageDecoder[ByteBuf] {
    +  override def decode(ctx: ChannelHandlerContext, in: ByteBuf, out: JList[AnyRef]): Unit = {
    +    val msgId = in.readByte()
    +    val decoded = msgId match {
    +      case 0 =>  // BlockFetchSuccess
    +        val blockId = ProtocolUtils.readBlockId(in)
    +        in.retain()
    +        new BlockFetchSuccess(blockId, new NettyManagedBuffer(in))
    +
    +      case 1 =>  // BlockFetchFailure
    +        val blockId = ProtocolUtils.readBlockId(in)
    +        val errorBytes = new Array[Byte](in.readableBytes())
    +        in.readBytes(errorBytes)
    +        new BlockFetchFailure(blockId, new String(errorBytes))
    +    }
    +
    +    assert(decoded.id == msgId)
    +    out.add(decoded)
    +  }
    +}
    +
    +
    +private[netty] object ProtocolUtils {
    +
    +  /** LengthFieldBasedFrameDecoder used before all decoders. */
    +  def createFrameDecoder(): ByteToMessageDecoder = {
    +    // maxFrameLength = 2G
    +    // lengthFieldOffset = 0
    +    // lengthFieldLength = 8
    +    // lengthAdjustment = -8, i.e. exclude the 8 byte length itself
    +    // initialBytesToStrip = 8, i.e. strip out the length field itself
    +    new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 8, -8, 8)
    +  }
    +
    +  // TODO(rxin): Make sure these work for all charsets.
    +  def readBlockId(in: ByteBuf): String = {
    +    val numBytesToRead = in.readByte().toInt
    --- End diff --
    
    Maybe add a comment that this is guaranteed <128.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55216364
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20137/consoleFull) for   PR 2330 at commit [`d135fa3`](https://github.com/apache/spark/commit/d135fa38801467b0dd870063c00103ddd45438c7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18175957
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/protocol.scala ---
    @@ -0,0 +1,260 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import java.util.{List => JList}
    +
    +import io.netty.buffer.ByteBuf
    +import io.netty.channel.ChannelHandlerContext
    +import io.netty.channel.ChannelHandler.Sharable
    +import io.netty.handler.codec._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
    +
    +
    +/** Messages from the client to the server. */
    +sealed trait ClientRequest {
    +  def id: Byte
    +}
    +
    +/**
    + * Request to fetch a sequence of blocks from the server. A single [[BlockFetchRequest]] can
    + * correspond to multiple [[ServerResponse]]s.
    + */
    +final case class BlockFetchRequest(blocks: Seq[String]) extends ClientRequest {
    +  override def id = 0
    +}
    +
    +/**
    + * Request to upload a block to the server. Currently the server does not ack the upload request.
    + */
    +final case class BlockUploadRequest(blockId: String, data: ManagedBuffer) extends ClientRequest {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 1
    +}
    +
    +
    +/** Messages from server to client (usually in response to some [[ClientRequest]]. */
    +sealed trait ServerResponse {
    +  def id: Byte
    +}
    +
    +/** Response to [[BlockFetchRequest]] when a block exists and has been successfully fetched. */
    +final case class BlockFetchSuccess(blockId: String, data: ManagedBuffer) extends ServerResponse {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 0
    +}
    +
    +/** Response to [[BlockFetchRequest]] when there is an error fetching the block. */
    +final case class BlockFetchFailure(blockId: String, error: String) extends ServerResponse {
    +  require(blockId.length <= Byte.MaxValue)
    +  override def id = 1
    +}
    +
    +
    +/**
    + * Encoder for [[ClientRequest]] used in client side.
    + *
    + * This encoder is stateless so it is safe to be shared by multiple threads.
    + */
    +@Sharable
    +final class ClientRequestEncoder extends MessageToMessageEncoder[ClientRequest] {
    +  override def encode(ctx: ChannelHandlerContext, in: ClientRequest, out: JList[Object]): Unit = {
    +    in match {
    +      case BlockFetchRequest(blocks) =>
    +        // 8 bytes: frame size
    +        // 1 byte: BlockFetchRequest vs BlockUploadRequest
    +        // 4 byte: num blocks
    +        // then for each block id write 1 byte for blockId.length and then blockId itself
    +        val frameLength = 8 + 1 + 4 + blocks.size + blocks.map(_.size).fold(0)(_ + _)
    +        val buf = ctx.alloc().buffer(frameLength)
    +
    +        buf.writeLong(frameLength)
    +        buf.writeByte(in.id)
    +        buf.writeInt(blocks.size)
    +        blocks.foreach { blockId =>
    +          ProtocolUtils.writeBlockId(buf, blockId)
    +        }
    +
    +        assert(buf.writableBytes() == 0)
    +        out.add(buf)
    +
    +      case BlockUploadRequest(blockId, data) =>
    +        // 8 bytes: frame size
    +        // 1 byte: msg id (BlockFetchRequest vs BlockUploadRequest)
    +        // 1 byte: blockId.length
    +        // data itself (length can be derived from: frame size - 1 - blockId.length)
    +        val headerLength = 8 + 1 + 1 + blockId.length
    +        val frameLength = headerLength + data.size
    +        val header = ctx.alloc().buffer(headerLength)
    +
    +        // Call this before we add header to out so in case of exceptions
    +        // we don't send anything at all.
    +        val body = data.convertToNetty()
    +
    +        header.writeLong(frameLength)
    +        header.writeByte(in.id)
    +        ProtocolUtils.writeBlockId(header, blockId)
    +
    +        assert(header.writableBytes() == 0)
    +        out.add(header)
    +        out.add(body)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * Decoder in the server side to decode client requests.
    + * This decoder is stateless so it is safe to be shared by multiple threads.
    + *
    + * This assumes the inbound messages have been processed by a frame decoder created by
    + * [[ProtocolUtils.createFrameDecoder()]].
    + */
    +@Sharable
    +final class ClientRequestDecoder extends MessageToMessageDecoder[ByteBuf] {
    +  override protected def decode(ctx: ChannelHandlerContext, in: ByteBuf, out: JList[AnyRef]): Unit =
    +  {
    +    val msgTypeId = in.readByte()
    +    val decoded = msgTypeId match {
    +      case 0 =>  // BlockFetchRequest
    +        val numBlocks = in.readInt()
    +        val blockIds = Seq.fill(numBlocks) { ProtocolUtils.readBlockId(in) }
    +        BlockFetchRequest(blockIds)
    +
    +      case 1 =>  // BlockUploadRequest
    +        val blockId = ProtocolUtils.readBlockId(in)
    +        in.retain()  // retain the bytebuf so we don't recycle it immediately.
    +        BlockUploadRequest(blockId, new NettyManagedBuffer(in))
    +    }
    +
    +    assert(decoded.id == msgTypeId)
    +    out.add(decoded)
    +  }
    +}
    +
    +
    +/**
    + * Encoder used by the server side to encode server-to-client responses.
    + * This encoder is stateless so it is safe to be shared by multiple threads.
    + */
    +@Sharable
    +final class ServerResponseEncoder extends MessageToMessageEncoder[ServerResponse] with Logging {
    +  override def encode(ctx: ChannelHandlerContext, in: ServerResponse, out: JList[Object]): Unit = {
    +    in match {
    +      case BlockFetchSuccess(blockId, data) =>
    +        // Handle the body first so if we encounter an error getting the body, we can respond
    +        // with an error instead.
    +        var body: AnyRef = null
    +        try {
    +          body = data.convertToNetty()
    --- End diff --
    
    the initial count for the reference is actually 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18263961
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockDataManager.scala ---
    @@ -20,14 +20,14 @@ package org.apache.spark.network
     import org.apache.spark.storage.StorageLevel
     
     
    +private[spark]
     trait BlockDataManager {
     
       /**
    -   * Interface to get local block data.
    -   *
    -   * @return Some(buffer) if the block exists locally, and None if it doesn't.
    +   * Interface to get local block data. Throws an exception if the block cannot be found or
    +   * cannot be read successfully.
        */
    -  def getBlockData(blockId: String): Option[ManagedBuffer]
    +  def getBlockData(blockId: String): ManagedBuffer
    --- End diff --
    
    see the pr description on the todos as separate prs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55468445
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/82/consoleFull) for   PR 2330 at commit [`8295561`](https://github.com/apache/spark/commit/8295561a9befcfa3c2a56d8836e05a42ce4ab6b0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55483917
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20256/consoleFull) for   PR 2330 at commit [`29fe0cc`](https://github.com/apache/spark/commit/29fe0cc9ea6ca1285d054b335d989d1131aa4b69).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55219321
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20137/consoleFull) for   PR 2330 at commit [`d135fa3`](https://github.com/apache/spark/commit/d135fa38801467b0dd870063c00103ddd45438c7).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57220174
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20986/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57213019
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20988/consoleFull) for   PR 2330 at commit [`ca88068`](https://github.com/apache/spark/commit/ca88068553a01d8e9c4ee9bd8dd519775a527787).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57259269
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21010/consoleFull) for   PR 2330 at commit [`bc9ed22`](https://github.com/apache/spark/commit/bc9ed22d4d9fd36599a076a0a201a1809ea3a24c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57259324
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21010/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by rxin <gi...@git.apache.org>.
Github user rxin closed the pull request at:

    https://github.com/apache/spark/pull/2330


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57371768
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/216/consoleFull) for   PR 2330 at commit [`bdab2c7`](https://github.com/apache/spark/commit/bdab2c74111c8bce382323f68732f87ca9b080a9).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-55213524
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/54/consoleFull) for   PR 2330 at commit [`b32c3fe`](https://github.com/apache/spark/commit/b32c3fe568163614a6eb424e523f7dd545d8ce9e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] [WIP] Refactor Netty module to us...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r17285939
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockClientHandler.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.netty
    +
    +import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.network.BlockFetchingListener
    +
    +
    +/**
    + * Handler that processes server responses.
    + *
    + * Concurrency: thread safe and can be called from multiple threads.
    + */
    +private[netty]
    +class BlockClientHandler extends SimpleChannelInboundHandler[ServerResponse] with Logging {
    +
    +  /** Tracks the list of outstanding requests and their listeners on success/failure. */
    +  private val outstandingRequests = java.util.Collections.synchronizedMap {
    +    new java.util.HashMap[String, BlockFetchingListener]
    +  }
    +
    +  def addRequest(blockId: String, listener: BlockFetchingListener): Unit = {
    +    outstandingRequests.put(blockId, listener)
    +  }
    +
    +  def removeRequest(blockId: String): Unit = {
    +    outstandingRequests.remove(blockId)
    +  }
    +
    +  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
    +    val errorMsg = s"Exception in connection from ${ctx.channel.remoteAddress}: ${cause.getMessage}"
    +    logError(errorMsg, cause)
    +
    +    // Fire the failure callback for all outstanding blocks
    +    outstandingRequests.synchronized {
    +      val iter = outstandingRequests.entrySet().iterator()
    +      while (iter.hasNext) {
    +        val entry = iter.next()
    +        entry.getValue.onBlockFetchFailure(cause)
    +      }
    +      outstandingRequests.clear()
    +    }
    +
    +    ctx.close()
    +  }
    +
    +  override def channelRead0(ctx: ChannelHandlerContext, response: ServerResponse) {
    +    val server = ctx.channel.remoteAddress.toString
    +    response match {
    +      case BlockFetchSuccess(blockId, buf) =>
    +        val listener = outstandingRequests.get(blockId)
    +        if (listener == null) {
    +          logWarning(s"Got a response for block $blockId from $server but it is not outstanding")
    +        } else {
    +          outstandingRequests.remove(blockId)
    +          listener.onBlockFetchSuccess(blockId, buf)
    --- End diff --
    
    note to self: we probably need to retain the buffer somehow ... otherwise it could get recycled by another thread


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57286803
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21032/consoleFull) for   PR 2330 at commit [`ad09236`](https://github.com/apache/spark/commit/ad092361f649b82dff64c44a30b50af1e9cccc0c).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Netty-based BlockTransferService

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2330#discussion_r18214345
  
    --- Diff: core/src/main/scala/org/apache/spark/network/netty/BlockServer.scala ---
    @@ -74,42 +54,24 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo
       /** Initialize the server. */
       private def init(): Unit = {
         bootstrap = new ServerBootstrap
    -    val bossThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-boss")
    -    val workerThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-worker")
    +    val threadFactory = Utils.namedThreadFactory("spark-netty-server")
     
         // Use only one thread to accept connections, and 2 * num_cores for worker.
         def initNio(): Unit = {
    -      val bossGroup = new NioEventLoopGroup(1, bossThreadFactory)
    -      val workerGroup = new NioEventLoopGroup(0, workerThreadFactory)
    -      workerGroup.setIoRatio(conf.ioRatio)
    +      val bossGroup = new NioEventLoopGroup(conf.serverThreads, threadFactory)
    +      val workerGroup = bossGroup
           bootstrap.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
         }
    -    def initOio(): Unit = {
    -      val bossGroup = new OioEventLoopGroup(1, bossThreadFactory)
    -      val workerGroup = new OioEventLoopGroup(0, workerThreadFactory)
    -      bootstrap.group(bossGroup, workerGroup).channel(classOf[OioServerSocketChannel])
    -    }
         def initEpoll(): Unit = {
    -      val bossGroup = new EpollEventLoopGroup(1, bossThreadFactory)
    -      val workerGroup = new EpollEventLoopGroup(0, workerThreadFactory)
    -      workerGroup.setIoRatio(conf.ioRatio)
    +      val bossGroup = new EpollEventLoopGroup(conf.serverThreads, threadFactory)
    +      val workerGroup = bossGroup
    --- End diff --
    
    If you keep both boss and worker group the same, I think it's sufficient to just shutdown one of the two in `close()`. But maybe it's more future proof to keep it as it is. ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3453] Refactor Netty module to use Bloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2330#issuecomment-57215066
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20990/consoleFull) for   PR 2330 at commit [`dfc2c34`](https://github.com/apache/spark/commit/dfc2c34b8951f0b3c3256b6ed744d1d8ece78583).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org