You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/10/29 19:27:17 UTC

[4/4] git commit: [SPARK-3453] Netty-based BlockTransferService, extracted from Spark core

[SPARK-3453] Netty-based BlockTransferService, extracted from Spark core

This PR encapsulates #2330, which is itself a continuation of #2240. The first goal of this PR is to provide an alternate, simpler implementation of the ConnectionManager which is based on Netty.

In addition to this goal, however, we want to resolve [SPARK-3796](https://issues.apache.org/jira/browse/SPARK-3796), which calls for a standalone shuffle service which can be integrated into the YARN NodeManager, Standalone Worker, or on its own. This PR makes the first step in this direction by ensuring that the actual Netty service is as small as possible and extracted from Spark core. Given this, we should be able to construct this standalone jar which can be included in other JVMs without incurring significant dependency or runtime issues. The actual work to ensure that such a standalone shuffle service would work in Spark will be left for a future PR, however.

In order to minimize dependencies and allow for the service to be long-running (possibly much longer-running than Spark, and possibly having to support multiple version of Spark simultaneously), the entire service has been ported to Java, where we have full control over the binary compatibility of the components and do not depend on the Scala runtime or version.

These issues: have been addressed by folding in #2330:

SPARK-3453: Refactor Netty module to use BlockTransferService interface
SPARK-3018: Release all buffers upon task completion/failure
SPARK-3002: Create a connection pool and reuse clients across different threads
SPARK-3017: Integration tests and unit tests for connection failures
SPARK-3049: Make sure client doesn't block when server/connection has error(s)
SPARK-3502: SO_RCVBUF and SO_SNDBUF should be bootstrap childOption, not option
SPARK-3503: Disable thread local cache in PooledByteBufAllocator

TODO before mergeable:
- [x] Implement uploadBlock()
- [x] Unit tests for RPC side of code
- [x] Performance testing (see comments [here](https://github.com/apache/spark/pull/2753#issuecomment-59475022))
- [x] Turn OFF by default (currently on for unit testing)

Author: Reynold Xin <rx...@apache.org>
Author: Aaron Davidson <aa...@databricks.com>
Author: cocoatomo <co...@gmail.com>
Author: Patrick Wendell <pw...@gmail.com>
Author: Prashant Sharma <pr...@imaginea.com>
Author: Davies Liu <da...@gmail.com>
Author: Anand Avati <av...@redhat.com>

Closes #2753 from aarondav/netty and squashes the following commits:

cadfd28 [Aaron Davidson] Turn netty off by default
d7be11b [Aaron Davidson] Turn netty on by default
4a204b8 [Aaron Davidson] Fail block fetches if client connection fails
2b0d1c0 [Aaron Davidson] 100ch
0c5bca2 [Aaron Davidson] Merge branch 'master' of https://github.com/apache/spark into netty
14e37f7 [Aaron Davidson] Address Reynold's comments
8dfcceb [Aaron Davidson] Merge branch 'master' of https://github.com/apache/spark into netty
322dfc1 [Aaron Davidson] Address Reynold's comments, including major rename
e5675a4 [Aaron Davidson] Fail outstanding RPCs as well
ccd4959 [Aaron Davidson] Don't throw exception if client immediately fails
9da0bc1 [Aaron Davidson] Add RPC unit tests
d236dfd [Aaron Davidson] Remove no-op serializer :)
7b7a26c [Aaron Davidson] Fix Nio compile issue
dd420fd [Aaron Davidson] Merge branch 'master' of https://github.com/apache/spark into netty-test
939f276 [Aaron Davidson] Attempt to make comm. bidirectional
aa58f67 [cocoatomo] [SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and building warnings
8dc1ded [cocoatomo] [SPARK-3867][PySpark] ./python/run-tests failed when it run with Python 2.6 and unittest2 is not installed
5b5dbe6 [Prashant Sharma] [SPARK-2924] Required by scala 2.11, only one fun/ctor amongst overriden alternatives, can have default argument(s).
2c5d9dc [Patrick Wendell] HOTFIX: Fix build issue with Akka 2.3.4 upgrade.
020691e [Davies Liu] [SPARK-3886] [PySpark] use AutoBatchedSerializer by default
ae4083a [Anand Avati] [SPARK-2805] Upgrade Akka to 2.3.4
29c6dcf [Aaron Davidson] [SPARK-3453] Netty-based BlockTransferService, extracted from Spark core
f7e7568 [Reynold Xin] Fixed spark.shuffle.io.receiveBuffer setting.
5d98ce3 [Reynold Xin] Flip buffer.
f6c220d [Reynold Xin] Merge with latest master.
407e59a [Reynold Xin] Fix style violation.
a0518c7 [Reynold Xin] Implemented block uploads.
4b18db2 [Reynold Xin] Copy the buffer in fetchBlockSync.
bec4ea2 [Reynold Xin] Removed OIO and added num threads settings.
1bdd7ee [Reynold Xin] Fixed tests.
d68f328 [Reynold Xin] Logging close() in case close() fails.
f63fb4c [Reynold Xin] Add more debug message.
6afc435 [Reynold Xin] Added logging.
c066309 [Reynold Xin] Implement java.io.Closeable interface.
519d64d [Reynold Xin] Mark private package visibility and MimaExcludes.
f0a16e9 [Reynold Xin] Fixed test hanging.
14323a5 [Reynold Xin] Removed BlockManager.getLocalShuffleFromDisk.
b2f3281 [Reynold Xin] Added connection pooling.
d23ed7b [Reynold Xin] Incorporated feedback from Norman: - use same pool for boss and worker - remove ioratio - disable caching of byte buf allocator - childoption sendbuf/receivebuf - fire exception through pipeline
9e0cb87 [Reynold Xin] Fixed BlockClientHandlerSuite
5cd33d7 [Reynold Xin] Fixed style violation.
cb589ec [Reynold Xin] Added more test cases covering cleanup when fault happens in ShuffleBlockFetcherIteratorSuite
1be4e8e [Reynold Xin] Shorten NioManagedBuffer and NettyManagedBuffer class names.
108c9ed [Reynold Xin] Forgot to add TestSerializer to the commit list.
b5c8d1f [Reynold Xin] Fixed ShuffleBlockFetcherIteratorSuite.
064747b [Reynold Xin] Reference count buffers and clean them up properly.
2b44cf1 [Reynold Xin] Added more documentation.
1760d32 [Reynold Xin] Use Epoll.isAvailable in BlockServer as well.
165eab1 [Reynold Xin] [SPARK-3453] Refactor Netty module to use BlockTransferService.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dff01553
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dff01553
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dff01553

Branch: refs/heads/master
Commit: dff015533dd7b01b5e392f1ac5f3837e0a65f3f4
Parents: 51ce997
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Oct 29 11:27:07 2014 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Oct 29 11:27:07 2014 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |   5 +
 .../main/scala/org/apache/spark/SparkEnv.scala  |   9 +-
 .../apache/spark/network/BlockDataManager.scala |  14 +-
 .../spark/network/BlockFetchingListener.scala   |  11 +-
 .../spark/network/BlockTransferService.scala    |  28 +-
 .../apache/spark/network/ManagedBuffer.scala    | 166 ------------
 .../spark/network/netty/NettyBlockFetcher.scala |  95 +++++++
 .../network/netty/NettyBlockRpcServer.scala     |  76 ++++++
 .../netty/NettyBlockTransferService.scala       | 111 ++++++++
 .../spark/network/netty/NettyConfig.scala       |  59 -----
 .../spark/network/netty/PathResolver.scala      |  25 --
 .../netty/client/BlockClientListener.scala      |  29 ---
 .../netty/client/BlockFetchingClient.scala      | 132 ----------
 .../client/BlockFetchingClientFactory.scala     |  99 -------
 .../client/BlockFetchingClientHandler.scala     | 104 --------
 .../network/netty/client/LazyInitIterator.scala |  44 ----
 .../netty/client/ReferenceCountedBuffer.scala   |  47 ----
 .../network/netty/server/BlockHeader.scala      |  32 ---
 .../netty/server/BlockHeaderEncoder.scala       |  47 ----
 .../network/netty/server/BlockServer.scala      | 162 ------------
 .../server/BlockServerChannelInitializer.scala  |  40 ---
 .../netty/server/BlockServerHandler.scala       | 140 ----------
 .../network/nio/NioBlockTransferService.scala   |  51 ++--
 .../apache/spark/serializer/Serializer.scala    |   6 +-
 .../spark/shuffle/FileShuffleBlockManager.scala |   4 +-
 .../shuffle/IndexShuffleBlockManager.scala      |   2 +-
 .../spark/shuffle/ShuffleBlockManager.scala     |   3 +-
 .../spark/storage/BlockDataProvider.scala       |  32 ---
 .../org/apache/spark/storage/BlockManager.scala |  52 ++--
 .../spark/storage/BlockNotFoundException.scala  |   1 -
 .../storage/ShuffleBlockFetcherIterator.scala   | 135 +++++++---
 .../scala/org/apache/spark/util/Utils.scala     |   1 -
 .../org/apache/spark/ShuffleNettySuite.scala    |   4 +-
 .../netty/ServerClientIntegrationSuite.scala    | 161 ------------
 .../BlockFetchingClientHandlerSuite.scala       | 106 --------
 .../netty/server/BlockHeaderEncoderSuite.scala  |  64 -----
 .../netty/server/BlockServerHandlerSuite.scala  | 107 --------
 .../spark/serializer/TestSerializer.scala       |  60 +++++
 .../shuffle/hash/HashShuffleManagerSuite.scala  |   8 +-
 .../ShuffleBlockFetcherIteratorSuite.scala      | 261 +++++++++++--------
 network/common/pom.xml                          |  94 +++++++
 .../apache/spark/network/TransportContext.java  | 117 +++++++++
 .../buffer/FileSegmentManagedBuffer.java        | 154 +++++++++++
 .../spark/network/buffer/ManagedBuffer.java     |  71 +++++
 .../network/buffer/NettyManagedBuffer.java      |  76 ++++++
 .../spark/network/buffer/NioManagedBuffer.java  |  75 ++++++
 .../client/ChunkFetchFailureException.java      |  31 +++
 .../network/client/ChunkReceivedCallback.java   |  47 ++++
 .../network/client/RpcResponseCallback.java     |  30 +++
 .../spark/network/client/TransportClient.java   | 159 +++++++++++
 .../network/client/TransportClientFactory.java  | 182 +++++++++++++
 .../client/TransportResponseHandler.java        | 167 ++++++++++++
 .../network/protocol/ChunkFetchFailure.java     |  76 ++++++
 .../network/protocol/ChunkFetchRequest.java     |  66 +++++
 .../network/protocol/ChunkFetchSuccess.java     |  80 ++++++
 .../spark/network/protocol/Encodable.java       |  41 +++
 .../apache/spark/network/protocol/Message.java  |  58 +++++
 .../spark/network/protocol/MessageDecoder.java  |  70 +++++
 .../spark/network/protocol/MessageEncoder.java  |  80 ++++++
 .../spark/network/protocol/RequestMessage.java  |  25 ++
 .../spark/network/protocol/ResponseMessage.java |  25 ++
 .../spark/network/protocol/RpcFailure.java      |  74 ++++++
 .../spark/network/protocol/RpcRequest.java      |  81 ++++++
 .../spark/network/protocol/RpcResponse.java     |  72 +++++
 .../spark/network/protocol/StreamChunkId.java   |  73 ++++++
 .../network/server/DefaultStreamManager.java    | 104 ++++++++
 .../spark/network/server/MessageHandler.java    |  36 +++
 .../apache/spark/network/server/RpcHandler.java |  38 +++
 .../spark/network/server/StreamManager.java     |  52 ++++
 .../network/server/TransportChannelHandler.java |  96 +++++++
 .../network/server/TransportRequestHandler.java | 162 ++++++++++++
 .../spark/network/server/TransportServer.java   | 121 +++++++++
 .../spark/network/util/ConfigProvider.java      |  52 ++++
 .../org/apache/spark/network/util/IOMode.java   |  27 ++
 .../apache/spark/network/util/JavaUtils.java    |  38 +++
 .../apache/spark/network/util/NettyUtils.java   | 102 ++++++++
 .../spark/network/util/TransportConf.java       |  61 +++++
 .../network/ChunkFetchIntegrationSuite.java     | 217 +++++++++++++++
 .../apache/spark/network/NoOpRpcHandler.java    |  28 ++
 .../org/apache/spark/network/ProtocolSuite.java |  86 ++++++
 .../spark/network/RpcIntegrationSuite.java      | 175 +++++++++++++
 .../network/SystemPropertyConfigProvider.java   |  34 +++
 .../apache/spark/network/TestManagedBuffer.java | 104 ++++++++
 .../org/apache/spark/network/TestUtils.java     |  30 +++
 .../network/TransportClientFactorySuite.java    | 102 ++++++++
 .../network/TransportResponseHandlerSuite.java  | 115 ++++++++
 pom.xml                                         |   1 +
 project/MimaExcludes.scala                      |   5 +
 88 files changed, 4512 insertions(+), 1831 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 5cd21e1..8020a2d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -45,6 +45,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>network</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>net.java.dev.jets3t</groupId>
       <artifactId>jets3t</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 5c076e5..6a6dfda 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -32,6 +32,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.netty.{NettyBlockTransferService}
 import org.apache.spark.network.nio.NioBlockTransferService
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.Serializer
@@ -272,7 +273,13 @@ object SparkEnv extends Logging {
 
     val shuffleMemoryManager = new ShuffleMemoryManager(conf)
 
-    val blockTransferService = new NioBlockTransferService(conf, securityManager)
+    val blockTransferService =
+      conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
+        case "netty" =>
+          new NettyBlockTransferService(conf)
+        case "nio" =>
+          new NioBlockTransferService(conf, securityManager)
+      }
 
     val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
       "BlockManagerMaster",

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index e0e9172..1745d52 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -17,20 +17,20 @@
 
 package org.apache.spark.network
 
-import org.apache.spark.storage.StorageLevel
-
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.storage.{BlockId, StorageLevel}
 
+private[spark]
 trait BlockDataManager {
 
   /**
-   * Interface to get local block data.
-   *
-   * @return Some(buffer) if the block exists locally, and None if it doesn't.
+   * Interface to get local block data. Throws an exception if the block cannot be found or
+   * cannot be read successfully.
    */
-  def getBlockData(blockId: String): Option[ManagedBuffer]
+  def getBlockData(blockId: BlockId): ManagedBuffer
 
   /**
    * Put the block locally, using the given storage level.
    */
-  def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
+  def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
index 34acaa5..645793f 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
@@ -19,19 +19,24 @@ package org.apache.spark.network
 
 import java.util.EventListener
 
+import org.apache.spark.network.buffer.ManagedBuffer
+
 
 /**
  * Listener callback interface for [[BlockTransferService.fetchBlocks]].
  */
+private[spark]
 trait BlockFetchingListener extends EventListener {
 
   /**
-   * Called once per successfully fetched block.
+   * Called once per successfully fetched block. After this call returns, data will be released
+   * automatically. If the data will be passed to another thread, the receiver should retain()
+   * and release() the buffer on their own, or copy the data to a new buffer.
    */
   def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
 
   /**
-   * Called upon failures. For each failure, this is called only once (i.e. not once per block).
+   * Called at least once per block upon failures.
    */
-  def onBlockFetchFailure(exception: Throwable): Unit
+  def onBlockFetchFailure(blockId: String, exception: Throwable): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 84d991f..b083f46 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -17,13 +17,19 @@
 
 package org.apache.spark.network
 
+import java.io.Closeable
+import java.nio.ByteBuffer
+
 import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
 
-import org.apache.spark.storage.StorageLevel
-
+import org.apache.spark.Logging
+import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.util.Utils
 
-abstract class BlockTransferService {
+private[spark]
+abstract class BlockTransferService extends Closeable with Logging {
 
   /**
    * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
@@ -34,7 +40,7 @@ abstract class BlockTransferService {
   /**
    * Tear down the transfer service.
    */
-  def stop(): Unit
+  def close(): Unit
 
   /**
    * Port number the service is listening on, available only after [[init]] is invoked.
@@ -50,9 +56,6 @@ abstract class BlockTransferService {
    * Fetch a sequence of blocks from a remote node asynchronously,
    * available only after [[init]] is invoked.
    *
-   * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
-   * while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block).
-   *
    * Note that this API takes a sequence so the implementation can batch requests, and does not
    * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
    * the data of a block is fetched, rather than waiting for all blocks to be fetched.
@@ -69,7 +72,7 @@ abstract class BlockTransferService {
   def uploadBlock(
       hostname: String,
       port: Int,
-      blockId: String,
+      blockId: BlockId,
       blockData: ManagedBuffer,
       level: StorageLevel): Future[Unit]
 
@@ -83,7 +86,7 @@ abstract class BlockTransferService {
     val lock = new Object
     @volatile var result: Either[ManagedBuffer, Throwable] = null
     fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
-      override def onBlockFetchFailure(exception: Throwable): Unit = {
+      override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
         lock.synchronized {
           result = Right(exception)
           lock.notify()
@@ -91,7 +94,10 @@ abstract class BlockTransferService {
       }
       override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
         lock.synchronized {
-          result = Left(data)
+          val ret = ByteBuffer.allocate(data.size.toInt)
+          ret.put(data.nioByteBuffer())
+          ret.flip()
+          result = Left(new NioManagedBuffer(ret))
           lock.notify()
         }
       }
@@ -123,7 +129,7 @@ abstract class BlockTransferService {
   def uploadBlockSync(
       hostname: String,
       port: Int,
-      blockId: String,
+      blockId: BlockId,
       blockData: ManagedBuffer,
       level: StorageLevel): Unit = {
     Await.result(uploadBlock(hostname, port, blockId, blockData, level), Duration.Inf)

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
deleted file mode 100644
index 4211ba4..0000000
--- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.io._
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-import java.nio.channels.FileChannel.MapMode
-
-import scala.util.Try
-
-import com.google.common.io.ByteStreams
-import io.netty.buffer.{ByteBufInputStream, ByteBuf}
-
-import org.apache.spark.util.{ByteBufferInputStream, Utils}
-
-
-/**
- * This interface provides an immutable view for data in the form of bytes. The implementation
- * should specify how the data is provided:
- *
- * - FileSegmentManagedBuffer: data backed by part of a file
- * - NioByteBufferManagedBuffer: data backed by a NIO ByteBuffer
- * - NettyByteBufManagedBuffer: data backed by a Netty ByteBuf
- */
-sealed abstract class ManagedBuffer {
-  // Note that all the methods are defined with parenthesis because their implementations can
-  // have side effects (io operations).
-
-  /** Number of bytes of the data. */
-  def size: Long
-
-  /**
-   * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
-   * returned ByteBuffer should not affect the content of this buffer.
-   */
-  def nioByteBuffer(): ByteBuffer
-
-  /**
-   * Exposes this buffer's data as an InputStream. The underlying implementation does not
-   * necessarily check for the length of bytes read, so the caller is responsible for making sure
-   * it does not go over the limit.
-   */
-  def inputStream(): InputStream
-}
-
-
-/**
- * A [[ManagedBuffer]] backed by a segment in a file
- */
-final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
-  extends ManagedBuffer {
-
-  /**
-   * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
-   * Avoid unless there's a good reason not to.
-   */
-  private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
-
-  override def size: Long = length
-
-  override def nioByteBuffer(): ByteBuffer = {
-    var channel: FileChannel = null
-    try {
-      channel = new RandomAccessFile(file, "r").getChannel
-      // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
-      if (length < MIN_MEMORY_MAP_BYTES) {
-        val buf = ByteBuffer.allocate(length.toInt)
-        channel.position(offset)
-        while (buf.remaining() != 0) {
-          if (channel.read(buf) == -1) {
-            throw new IOException("Reached EOF before filling buffer\n" +
-              s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
-          }
-        }
-        buf.flip()
-        buf
-      } else {
-        channel.map(MapMode.READ_ONLY, offset, length)
-      }
-    } catch {
-      case e: IOException =>
-        Try(channel.size).toOption match {
-          case Some(fileLen) =>
-            throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
-          case None =>
-            throw new IOException(s"Error in opening $this", e)
-        }
-    } finally {
-      if (channel != null) {
-        Utils.tryLog(channel.close())
-      }
-    }
-  }
-
-  override def inputStream(): InputStream = {
-    var is: FileInputStream = null
-    try {
-      is = new FileInputStream(file)
-      ByteStreams.skipFully(is, offset)
-      ByteStreams.limit(is, length)
-    } catch {
-      case e: IOException =>
-        if (is != null) {
-          Utils.tryLog(is.close())
-        }
-        Try(file.length).toOption match {
-          case Some(fileLen) =>
-            throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
-          case None =>
-            throw new IOException(s"Error in opening $this", e)
-        }
-      case e: Throwable =>
-        if (is != null) {
-          Utils.tryLog(is.close())
-        }
-        throw e
-    }
-  }
-
-  override def toString: String = s"${getClass.getName}($file, $offset, $length)"
-}
-
-
-/**
- * A [[ManagedBuffer]] backed by [[java.nio.ByteBuffer]].
- */
-final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {
-
-  override def size: Long = buf.remaining()
-
-  override def nioByteBuffer() = buf.duplicate()
-
-  override def inputStream() = new ByteBufferInputStream(buf)
-}
-
-
-/**
- * A [[ManagedBuffer]] backed by a Netty [[ByteBuf]].
- */
-final class NettyByteBufManagedBuffer(buf: ByteBuf) extends ManagedBuffer {
-
-  override def size: Long = buf.readableBytes()
-
-  override def nioByteBuffer() = buf.nioBuffer()
-
-  override def inputStream() = new ByteBufInputStream(buf)
-
-  // TODO(rxin): Promote this to top level ManagedBuffer interface and add documentation for it.
-  def release(): Unit = buf.release()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
new file mode 100644
index 0000000..8c5ffd8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockFetcher.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.nio.ByteBuffer
+import java.util
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.network.BlockFetchingListener
+import org.apache.spark.network.netty.NettyMessages._
+import org.apache.spark.serializer.{JavaSerializer, Serializer}
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.client.{RpcResponseCallback, ChunkReceivedCallback, TransportClient}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.util.Utils
+
+/**
+ * Responsible for holding the state for a request for a single set of blocks. This assumes that
+ * the chunks will be returned in the same order as requested, and that there will be exactly
+ * one chunk per block.
+ *
+ * Upon receipt of any block, the listener will be called back. Upon failure part way through,
+ * the listener will receive a failure callback for each outstanding block.
+ */
+class NettyBlockFetcher(
+    serializer: Serializer,
+    client: TransportClient,
+    blockIds: Seq[String],
+    listener: BlockFetchingListener)
+  extends Logging {
+
+  require(blockIds.nonEmpty)
+
+  private val ser = serializer.newInstance()
+
+  private var streamHandle: ShuffleStreamHandle = _
+
+  private val chunkCallback = new ChunkReceivedCallback {
+    // On receipt of a chunk, pass it upwards as a block.
+    def onSuccess(chunkIndex: Int, buffer: ManagedBuffer): Unit = Utils.logUncaughtExceptions {
+      listener.onBlockFetchSuccess(blockIds(chunkIndex), buffer)
+    }
+
+    // On receipt of a failure, fail every block from chunkIndex onwards.
+    def onFailure(chunkIndex: Int, e: Throwable): Unit = {
+      blockIds.drop(chunkIndex).foreach { blockId =>
+        listener.onBlockFetchFailure(blockId, e);
+      }
+    }
+  }
+
+  /** Begins the fetching process, calling the listener with every block fetched. */
+  def start(): Unit = {
+    // Send the RPC to open the given set of blocks. This will return a ShuffleStreamHandle.
+    client.sendRpc(ser.serialize(OpenBlocks(blockIds.map(BlockId.apply))).array(),
+      new RpcResponseCallback {
+        override def onSuccess(response: Array[Byte]): Unit = {
+          try {
+            streamHandle = ser.deserialize[ShuffleStreamHandle](ByteBuffer.wrap(response))
+            logTrace(s"Successfully opened block set: $streamHandle! Preparing to fetch chunks.")
+
+            // Immediately request all chunks -- we expect that the total size of the request is
+            // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
+            for (i <- 0 until streamHandle.numChunks) {
+              client.fetchChunk(streamHandle.streamId, i, chunkCallback)
+            }
+          } catch {
+            case e: Exception =>
+              logError("Failed while starting block fetches", e)
+              blockIds.foreach(blockId => Utils.tryLog(listener.onBlockFetchFailure(blockId, e)))
+          }
+        }
+
+        override def onFailure(e: Throwable): Unit = {
+          logError("Failed while starting block fetches", e)
+          blockIds.foreach(blockId => Utils.tryLog(listener.onBlockFetchFailure(blockId, e)))
+        }
+      })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
new file mode 100644
index 0000000..02c657e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.Logging
+import org.apache.spark.network.BlockDataManager
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.client.{TransportClient, RpcResponseCallback}
+import org.apache.spark.network.server.{DefaultStreamManager, RpcHandler}
+import org.apache.spark.storage.{StorageLevel, BlockId}
+
+import scala.collection.JavaConversions._
+
+object NettyMessages {
+
+  /** Request to read a set of blocks. Returns [[ShuffleStreamHandle]] to identify the stream. */
+  case class OpenBlocks(blockIds: Seq[BlockId])
+
+  /** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
+  case class UploadBlock(blockId: BlockId, blockData: Array[Byte], level: StorageLevel)
+
+  /** Identifier for a fixed number of chunks to read from a stream created by [[OpenBlocks]]. */
+  case class ShuffleStreamHandle(streamId: Long, numChunks: Int)
+}
+
+/**
+ * Serves requests to open blocks by simply registering one chunk per block requested.
+ */
+class NettyBlockRpcServer(
+    serializer: Serializer,
+    streamManager: DefaultStreamManager,
+    blockManager: BlockDataManager)
+  extends RpcHandler with Logging {
+
+  import NettyMessages._
+
+  override def receive(
+      client: TransportClient,
+      messageBytes: Array[Byte],
+      responseContext: RpcResponseCallback): Unit = {
+    val ser = serializer.newInstance()
+    val message = ser.deserialize[AnyRef](ByteBuffer.wrap(messageBytes))
+    logTrace(s"Received request: $message")
+
+    message match {
+      case OpenBlocks(blockIds) =>
+        val blocks: Seq[ManagedBuffer] = blockIds.map(blockManager.getBlockData)
+        val streamId = streamManager.registerStream(blocks.iterator)
+        logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
+        responseContext.onSuccess(
+          ser.serialize(new ShuffleStreamHandle(streamId, blocks.size)).array())
+
+      case UploadBlock(blockId, blockData, level) =>
+        blockManager.putBlockData(blockId, new NioManagedBuffer(ByteBuffer.wrap(blockData)), level)
+        responseContext.onSuccess(new Array[Byte](0))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
new file mode 100644
index 0000000..38a3e94
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import scala.concurrent.{Promise, Future}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.network._
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory}
+import org.apache.spark.network.netty.NettyMessages.UploadBlock
+import org.apache.spark.network.server._
+import org.apache.spark.network.util.{ConfigProvider, TransportConf}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.util.Utils
+
+/**
+ * A BlockTransferService that uses Netty to fetch a set of blocks at at time.
+ */
+class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
+  // TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
+  val serializer = new JavaSerializer(conf)
+
+  // Create a TransportConfig using SparkConf.
+  private[this] val transportConf = new TransportConf(
+    new ConfigProvider { override def get(name: String) = conf.get(name) })
+
+  private[this] var transportContext: TransportContext = _
+  private[this] var server: TransportServer = _
+  private[this] var clientFactory: TransportClientFactory = _
+
+  override def init(blockDataManager: BlockDataManager): Unit = {
+    val streamManager = new DefaultStreamManager
+    val rpcHandler = new NettyBlockRpcServer(serializer, streamManager, blockDataManager)
+    transportContext = new TransportContext(transportConf, streamManager, rpcHandler)
+    clientFactory = transportContext.createClientFactory()
+    server = transportContext.createServer()
+  }
+
+  override def fetchBlocks(
+      hostname: String,
+      port: Int,
+      blockIds: Seq[String],
+      listener: BlockFetchingListener): Unit = {
+    try {
+      val client = clientFactory.createClient(hostname, port)
+      new NettyBlockFetcher(serializer, client, blockIds, listener).start()
+    } catch {
+      case e: Exception =>
+        logError("Exception while beginning fetchBlocks", e)
+        blockIds.foreach(listener.onBlockFetchFailure(_, e))
+    }
+  }
+
+  override def hostName: String = Utils.localHostName()
+
+  override def port: Int = server.getPort
+
+  override def uploadBlock(
+      hostname: String,
+      port: Int,
+      blockId: BlockId,
+      blockData: ManagedBuffer,
+      level: StorageLevel): Future[Unit] = {
+    val result = Promise[Unit]()
+    val client = clientFactory.createClient(hostname, port)
+
+    // Convert or copy nio buffer into array in order to serialize it.
+    val nioBuffer = blockData.nioByteBuffer()
+    val array = if (nioBuffer.hasArray) {
+      nioBuffer.array()
+    } else {
+      val data = new Array[Byte](nioBuffer.remaining())
+      nioBuffer.get(data)
+      data
+    }
+
+    val ser = serializer.newInstance()
+    client.sendRpc(ser.serialize(new UploadBlock(blockId, array, level)).array(),
+      new RpcResponseCallback {
+        override def onSuccess(response: Array[Byte]): Unit = {
+          logTrace(s"Successfully uploaded block $blockId")
+          result.success()
+        }
+        override def onFailure(e: Throwable): Unit = {
+          logError(s"Error while uploading block $blockId", e)
+          result.failure(e)
+        }
+      })
+
+    result.future
+  }
+
+  override def close(): Unit = server.close()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
deleted file mode 100644
index b587015..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import org.apache.spark.SparkConf
-
-/**
- * A central location that tracks all the settings we exposed to users.
- */
-private[spark]
-class NettyConfig(conf: SparkConf) {
-
-  /** Port the server listens on. Default to a random port. */
-  private[netty] val serverPort = conf.getInt("spark.shuffle.io.port", 0)
-
-  /** IO mode: nio, oio, epoll, or auto (try epoll first and then nio). */
-  private[netty] val ioMode = conf.get("spark.shuffle.io.mode", "nio").toLowerCase
-
-  /** Connect timeout in secs. Default 60 secs. */
-  private[netty] val connectTimeoutMs = conf.getInt("spark.shuffle.io.connectionTimeout", 60) * 1000
-
-  /**
-   * Percentage of the desired amount of time spent for I/O in the child event loops.
-   * Only applicable in nio and epoll.
-   */
-  private[netty] val ioRatio = conf.getInt("spark.shuffle.io.netty.ioRatio", 80)
-
-  /** Requested maximum length of the queue of incoming connections. */
-  private[netty] val backLog: Option[Int] = conf.getOption("spark.shuffle.io.backLog").map(_.toInt)
-
-  /**
-   * Receive buffer size (SO_RCVBUF).
-   * Note: the optimal size for receive buffer and send buffer should be
-   *  latency * network_bandwidth.
-   * Assuming latency = 1ms, network_bandwidth = 10Gbps
-   *  buffer size should be ~ 1.25MB
-   */
-  private[netty] val receiveBuf: Option[Int] =
-    conf.getOption("spark.shuffle.io.sendBuffer").map(_.toInt)
-
-  /** Send buffer size (SO_SNDBUF). */
-  private[netty] val sendBuf: Option[Int] =
-    conf.getOption("spark.shuffle.io.sendBuffer").map(_.toInt)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
deleted file mode 100644
index 0d76950..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import org.apache.spark.storage.{BlockId, FileSegment}
-
-trait PathResolver {
-  /** Get the file segment in which the given block resides. */
-  def getBlockLocation(blockId: BlockId): FileSegment
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/client/BlockClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockClientListener.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockClientListener.scala
deleted file mode 100644
index e28219d..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockClientListener.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import java.util.EventListener
-
-
-trait BlockClientListener extends EventListener {
-
-  def onFetchSuccess(blockId: String, data: ReferenceCountedBuffer): Unit
-
-  def onFetchFailure(blockId: String, errorMsg: String): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
deleted file mode 100644
index 3ab13b9..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import java.util.concurrent.TimeoutException
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.bootstrap.Bootstrap
-import io.netty.buffer.PooledByteBufAllocator
-import io.netty.channel.socket.SocketChannel
-import io.netty.channel.{ChannelFutureListener, ChannelFuture, ChannelInitializer, ChannelOption}
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder
-import io.netty.handler.codec.string.StringEncoder
-
-import org.apache.spark.Logging
-
-/**
- * Client for fetching data blocks from [[org.apache.spark.network.netty.server.BlockServer]].
- * Use [[BlockFetchingClientFactory]] to instantiate this client.
- *
- * The constructor blocks until a connection is successfully established.
- *
- * See [[org.apache.spark.network.netty.server.BlockServer]] for client/server protocol.
- *
- * Concurrency: thread safe and can be called from multiple threads.
- */
-@throws[TimeoutException]
-private[spark]
-class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String, port: Int)
-  extends Logging {
-
-  private val handler = new BlockFetchingClientHandler
-
-  /** Netty Bootstrap for creating the TCP connection. */
-  private val bootstrap: Bootstrap = {
-    val b = new Bootstrap
-    b.group(factory.workerGroup)
-      .channel(factory.socketChannelClass)
-      // Use pooled buffers to reduce temporary buffer allocation
-      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-      // Disable Nagle's Algorithm since we don't want packets to wait
-      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
-      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
-      .option[Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, factory.conf.connectTimeoutMs)
-
-    b.handler(new ChannelInitializer[SocketChannel] {
-      override def initChannel(ch: SocketChannel): Unit = {
-        ch.pipeline
-          .addLast("encoder", new StringEncoder(UTF_8))
-          // maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4
-          .addLast("framedLengthDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4))
-          .addLast("handler", handler)
-      }
-    })
-    b
-  }
-
-  /** Netty ChannelFuture for the connection. */
-  private val cf: ChannelFuture = bootstrap.connect(hostname, port)
-  if (!cf.awaitUninterruptibly(factory.conf.connectTimeoutMs)) {
-    throw new TimeoutException(
-      s"Connecting to $hostname:$port timed out (${factory.conf.connectTimeoutMs} ms)")
-  }
-
-  /**
-   * Ask the remote server for a sequence of blocks, and execute the callback.
-   *
-   * Note that this is asynchronous and returns immediately. Upstream caller should throttle the
-   * rate of fetching; otherwise we could run out of memory.
-   *
-   * @param blockIds sequence of block ids to fetch.
-   * @param listener callback to fire on fetch success / failure.
-   */
-  def fetchBlocks(blockIds: Seq[String], listener: BlockClientListener): Unit = {
-    // It's best to limit the number of "write" calls since it needs to traverse the whole pipeline.
-    // It's also best to limit the number of "flush" calls since it requires system calls.
-    // Let's concatenate the string and then call writeAndFlush once.
-    // This is also why this implementation might be more efficient than multiple, separate
-    // fetch block calls.
-    var startTime: Long = 0
-    logTrace {
-      startTime = System.nanoTime
-      s"Sending request $blockIds to $hostname:$port"
-    }
-
-    blockIds.foreach { blockId =>
-      handler.addRequest(blockId, listener)
-    }
-
-    val writeFuture = cf.channel().writeAndFlush(blockIds.mkString("\n") + "\n")
-    writeFuture.addListener(new ChannelFutureListener {
-      override def operationComplete(future: ChannelFuture): Unit = {
-        if (future.isSuccess) {
-          logTrace {
-            val timeTaken = (System.nanoTime - startTime).toDouble / 1000000
-            s"Sending request $blockIds to $hostname:$port took $timeTaken ms"
-          }
-        } else {
-          // Fail all blocks.
-          val errorMsg =
-            s"Failed to send request $blockIds to $hostname:$port: ${future.cause.getMessage}"
-          logError(errorMsg, future.cause)
-          blockIds.foreach { blockId =>
-            listener.onFetchFailure(blockId, errorMsg)
-            handler.removeRequest(blockId)
-          }
-        }
-      }
-    })
-  }
-
-  def waitForClose(): Unit = {
-    cf.channel().closeFuture().sync()
-  }
-
-  def close(): Unit = cf.channel().close()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
deleted file mode 100644
index 2b28402..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import io.netty.channel.epoll.{EpollEventLoopGroup, EpollSocketChannel}
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.oio.OioEventLoopGroup
-import io.netty.channel.socket.nio.NioSocketChannel
-import io.netty.channel.socket.oio.OioSocketChannel
-import io.netty.channel.{EventLoopGroup, Channel}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.network.netty.NettyConfig
-import org.apache.spark.util.Utils
-
-/**
- * Factory for creating [[BlockFetchingClient]] by using createClient. This factory reuses
- * the worker thread pool for Netty.
- *
- * Concurrency: createClient is safe to be called from multiple threads concurrently.
- */
-private[spark]
-class BlockFetchingClientFactory(val conf: NettyConfig) {
-
-  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
-
-  /** A thread factory so the threads are named (for debugging). */
-  val threadFactory = Utils.namedThreadFactory("spark-shuffle-client")
-
-  /** The following two are instantiated by the [[init]] method, depending ioMode. */
-  var socketChannelClass: Class[_ <: Channel] = _
-  var workerGroup: EventLoopGroup = _
-
-  init()
-
-  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
-  private def init(): Unit = {
-    def initOio(): Unit = {
-      socketChannelClass = classOf[OioSocketChannel]
-      workerGroup = new OioEventLoopGroup(0, threadFactory)
-    }
-    def initNio(): Unit = {
-      socketChannelClass = classOf[NioSocketChannel]
-      workerGroup = new NioEventLoopGroup(0, threadFactory)
-    }
-    def initEpoll(): Unit = {
-      socketChannelClass = classOf[EpollSocketChannel]
-      workerGroup = new EpollEventLoopGroup(0, threadFactory)
-    }
-
-    conf.ioMode match {
-      case "nio" => initNio()
-      case "oio" => initOio()
-      case "epoll" => initEpoll()
-      case "auto" =>
-        // For auto mode, first try epoll (only available on Linux), then nio.
-        try {
-          initEpoll()
-        } catch {
-          // TODO: Should we log the throwable? But that always happen on non-Linux systems.
-          // Perhaps the right thing to do is to check whether the system is Linux, and then only
-          // call initEpoll on Linux.
-          case e: Throwable => initNio()
-        }
-    }
-  }
-
-  /**
-   * Create a new BlockFetchingClient connecting to the given remote host / port.
-   *
-   * This blocks until a connection is successfully established.
-   *
-   * Concurrency: This method is safe to call from multiple threads.
-   */
-  def createClient(remoteHost: String, remotePort: Int): BlockFetchingClient = {
-    new BlockFetchingClient(this, remoteHost, remotePort)
-  }
-
-  def stop(): Unit = {
-    if (workerGroup != null) {
-      workerGroup.shutdownGracefully()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
deleted file mode 100644
index d9d3f7b..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.buffer.ByteBuf
-import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
-
-import org.apache.spark.Logging
-
-
-/**
- * Handler that processes server responses. It uses the protocol documented in
- * [[org.apache.spark.network.netty.server.BlockServer]].
- *
- * Concurrency: thread safe and can be called from multiple threads.
- */
-private[client]
-class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] with Logging {
-
-  /** Tracks the list of outstanding requests and their listeners on success/failure. */
-  private val outstandingRequests = java.util.Collections.synchronizedMap {
-    new java.util.HashMap[String, BlockClientListener]
-  }
-
-  def addRequest(blockId: String, listener: BlockClientListener): Unit = {
-    outstandingRequests.put(blockId, listener)
-  }
-
-  def removeRequest(blockId: String): Unit = {
-    outstandingRequests.remove(blockId)
-  }
-
-  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
-    val errorMsg = s"Exception in connection from ${ctx.channel.remoteAddress}: ${cause.getMessage}"
-    logError(errorMsg, cause)
-
-    // Fire the failure callback for all outstanding blocks
-    outstandingRequests.synchronized {
-      val iter = outstandingRequests.entrySet().iterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        entry.getValue.onFetchFailure(entry.getKey, errorMsg)
-      }
-      outstandingRequests.clear()
-    }
-
-    ctx.close()
-  }
-
-  override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
-    val totalLen = in.readInt()
-    val blockIdLen = in.readInt()
-    val blockIdBytes = new Array[Byte](math.abs(blockIdLen))
-    in.readBytes(blockIdBytes)
-    val blockId = new String(blockIdBytes, UTF_8)
-    val blockSize = totalLen - math.abs(blockIdLen) - 4
-
-    def server = ctx.channel.remoteAddress.toString
-
-    // blockIdLen is negative when it is an error message.
-    if (blockIdLen < 0) {
-      val errorMessageBytes = new Array[Byte](blockSize)
-      in.readBytes(errorMessageBytes)
-      val errorMsg = new String(errorMessageBytes, UTF_8)
-      logTrace(s"Received block $blockId ($blockSize B) with error $errorMsg from $server")
-
-      val listener = outstandingRequests.get(blockId)
-      if (listener == null) {
-        // Ignore callback
-        logWarning(s"Got a response for block $blockId but it is not in our outstanding requests")
-      } else {
-        outstandingRequests.remove(blockId)
-        listener.onFetchFailure(blockId, errorMsg)
-      }
-    } else {
-      logTrace(s"Received block $blockId ($blockSize B) from $server")
-
-      val listener = outstandingRequests.get(blockId)
-      if (listener == null) {
-        // Ignore callback
-        logWarning(s"Got a response for block $blockId but it is not in our outstanding requests")
-      } else {
-        outstandingRequests.remove(blockId)
-        listener.onFetchSuccess(blockId, new ReferenceCountedBuffer(in))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala b/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
deleted file mode 100644
index 9740ee6..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-/**
- * A simple iterator that lazily initializes the underlying iterator.
- *
- * The use case is that sometimes we might have many iterators open at the same time, and each of
- * the iterator might initialize its own buffer (e.g. decompression buffer, deserialization buffer).
- * This could lead to too many buffers open. If this iterator is used, we lazily initialize those
- * buffers.
- */
-private[spark]
-class LazyInitIterator(createIterator: => Iterator[Any]) extends Iterator[Any] {
-
-  lazy val proxy = createIterator
-
-  override def hasNext: Boolean = {
-    val gotNext = proxy.hasNext
-    if (!gotNext) {
-      close()
-    }
-    gotNext
-  }
-
-  override def next(): Any = proxy.next()
-
-  def close(): Unit = Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala b/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
deleted file mode 100644
index ea1abf5..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import java.io.InputStream
-import java.nio.ByteBuffer
-
-import io.netty.buffer.{ByteBuf, ByteBufInputStream}
-
-
-/**
- * A buffer abstraction based on Netty's ByteBuf so we don't expose Netty.
- * This is a Scala value class.
- *
- * The buffer's life cycle is NOT managed by the JVM, and thus requiring explicit declaration of
- * reference by the retain method and release method.
- */
-private[spark]
-class ReferenceCountedBuffer(val underlying: ByteBuf) extends AnyVal {
-
-  /** Return the nio ByteBuffer view of the underlying buffer. */
-  def byteBuffer(): ByteBuffer = underlying.nioBuffer
-
-  /** Creates a new input stream that starts from the current position of the buffer. */
-  def inputStream(): InputStream = new ByteBufInputStream(underlying)
-
-  /** Increment the reference counter by one. */
-  def retain(): Unit = underlying.retain()
-
-  /** Decrement the reference counter by one and release the buffer if the ref count is 0. */
-  def release(): Unit = underlying.release()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
deleted file mode 100644
index 162e9cc..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-/**
- * Header describing a block. This is used only in the server pipeline.
- *
- * [[BlockServerHandler]] creates this, and [[BlockHeaderEncoder]] encodes it.
- *
- * @param blockSize length of the block content, excluding the length itself.
- *                 If positive, this is the header for a block (not part of the header).
- *                 If negative, this is the header and content for an error message.
- * @param blockId block id
- * @param error some error message from reading the block
- */
-private[server]
-class BlockHeader(val blockSize: Int, val blockId: String, val error: Option[String] = None)

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
deleted file mode 100644
index 8e4dda4..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import io.netty.buffer.ByteBuf
-import io.netty.channel.ChannelHandlerContext
-import io.netty.handler.codec.MessageToByteEncoder
-
-/**
- * A simple encoder for BlockHeader. See [[BlockServer]] for the server to client protocol.
- */
-private[server]
-class BlockHeaderEncoder extends MessageToByteEncoder[BlockHeader] {
-  override def encode(ctx: ChannelHandlerContext, msg: BlockHeader, out: ByteBuf): Unit = {
-    // message = message length (4 bytes) + block id length (4 bytes) + block id + block data
-    // message length = block id length (4 bytes) + size of block id + size of block data
-    val blockIdBytes = msg.blockId.getBytes
-    msg.error match {
-      case Some(errorMsg) =>
-        val errorBytes = errorMsg.getBytes
-        out.writeInt(4 + blockIdBytes.length + errorBytes.size)
-        out.writeInt(-blockIdBytes.length)  // use negative block id length to represent errors
-        out.writeBytes(blockIdBytes)  // next is blockId itself
-        out.writeBytes(errorBytes)  // error message
-      case None =>
-        out.writeInt(4 + blockIdBytes.length + msg.blockSize)
-        out.writeInt(blockIdBytes.length)  // First 4 bytes is blockId length
-        out.writeBytes(blockIdBytes)  // next is blockId itself
-        // msg of size blockSize will be written by ServerHandler
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
deleted file mode 100644
index 9194c7c..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import java.net.InetSocketAddress
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.buffer.PooledByteBufAllocator
-import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
-import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.oio.OioEventLoopGroup
-import io.netty.channel.socket.SocketChannel
-import io.netty.channel.socket.nio.NioServerSocketChannel
-import io.netty.channel.socket.oio.OioServerSocketChannel
-import io.netty.handler.codec.LineBasedFrameDecoder
-import io.netty.handler.codec.string.StringDecoder
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.network.netty.NettyConfig
-import org.apache.spark.storage.BlockDataProvider
-import org.apache.spark.util.Utils
-
-
-/**
- * Server for serving Spark data blocks.
- * This should be used together with [[org.apache.spark.network.netty.client.BlockFetchingClient]].
- *
- * Protocol for requesting blocks (client to server):
- *   One block id per line, e.g. to request 3 blocks: "block1\nblock2\nblock3\n"
- *
- * Protocol for sending blocks (server to client):
- *   frame-length (4 bytes), block-id-length (4 bytes), block-id, block-data.
- *
- *   frame-length should not include the length of itself.
- *   If block-id-length is negative, then this is an error message rather than block-data. The real
- *   length is the absolute value of the frame-length.
- *
- */
-private[spark]
-class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Logging {
-
-  def this(sparkConf: SparkConf, dataProvider: BlockDataProvider) = {
-    this(new NettyConfig(sparkConf), dataProvider)
-  }
-
-  def port: Int = _port
-
-  def hostName: String = _hostName
-
-  private var _port: Int = conf.serverPort
-  private var _hostName: String = ""
-  private var bootstrap: ServerBootstrap = _
-  private var channelFuture: ChannelFuture = _
-
-  init()
-
-  /** Initialize the server. */
-  private def init(): Unit = {
-    bootstrap = new ServerBootstrap
-    val bossThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-boss")
-    val workerThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-worker")
-
-    // Use only one thread to accept connections, and 2 * num_cores for worker.
-    def initNio(): Unit = {
-      val bossGroup = new NioEventLoopGroup(1, bossThreadFactory)
-      val workerGroup = new NioEventLoopGroup(0, workerThreadFactory)
-      workerGroup.setIoRatio(conf.ioRatio)
-      bootstrap.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
-    }
-    def initOio(): Unit = {
-      val bossGroup = new OioEventLoopGroup(1, bossThreadFactory)
-      val workerGroup = new OioEventLoopGroup(0, workerThreadFactory)
-      bootstrap.group(bossGroup, workerGroup).channel(classOf[OioServerSocketChannel])
-    }
-    def initEpoll(): Unit = {
-      val bossGroup = new EpollEventLoopGroup(1, bossThreadFactory)
-      val workerGroup = new EpollEventLoopGroup(0, workerThreadFactory)
-      workerGroup.setIoRatio(conf.ioRatio)
-      bootstrap.group(bossGroup, workerGroup).channel(classOf[EpollServerSocketChannel])
-    }
-
-    conf.ioMode match {
-      case "nio" => initNio()
-      case "oio" => initOio()
-      case "epoll" => initEpoll()
-      case "auto" =>
-        // For auto mode, first try epoll (only available on Linux), then nio.
-        try {
-          initEpoll()
-        } catch {
-          // TODO: Should we log the throwable? But that always happen on non-Linux systems.
-          // Perhaps the right thing to do is to check whether the system is Linux, and then only
-          // call initEpoll on Linux.
-          case e: Throwable => initNio()
-        }
-    }
-
-    // Use pooled buffers to reduce temporary buffer allocation
-    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-    bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-
-    // Various (advanced) user-configured settings.
-    conf.backLog.foreach { backLog =>
-      bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, backLog)
-    }
-    conf.receiveBuf.foreach { receiveBuf =>
-      bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
-    }
-    conf.sendBuf.foreach { sendBuf =>
-      bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
-    }
-
-    bootstrap.childHandler(new ChannelInitializer[SocketChannel] {
-      override def initChannel(ch: SocketChannel): Unit = {
-        ch.pipeline
-          .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
-          .addLast("stringDecoder", new StringDecoder(UTF_8))
-          .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
-          .addLast("handler", new BlockServerHandler(dataProvider))
-      }
-    })
-
-    channelFuture = bootstrap.bind(new InetSocketAddress(_port))
-    channelFuture.sync()
-
-    val addr = channelFuture.channel.localAddress.asInstanceOf[InetSocketAddress]
-    _port = addr.getPort
-    _hostName = addr.getHostName
-  }
-
-  /** Shutdown the server. */
-  def stop(): Unit = {
-    if (channelFuture != null) {
-      channelFuture.channel().close().awaitUninterruptibly()
-      channelFuture = null
-    }
-    if (bootstrap != null && bootstrap.group() != null) {
-      bootstrap.group().shutdownGracefully()
-    }
-    if (bootstrap != null && bootstrap.childGroup() != null) {
-      bootstrap.childGroup().shutdownGracefully()
-    }
-    bootstrap = null
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
deleted file mode 100644
index 188154d..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.channel.ChannelInitializer
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.LineBasedFrameDecoder
-import io.netty.handler.codec.string.StringDecoder
-
-import org.apache.spark.storage.BlockDataProvider
-
-/** Channel initializer that sets up the pipeline for the BlockServer. */
-private[netty]
-class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
-  extends ChannelInitializer[SocketChannel] {
-
-  override def initChannel(ch: SocketChannel): Unit = {
-    ch.pipeline
-      .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
-      .addLast("stringDecoder", new StringDecoder(UTF_8))
-      .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
-      .addLast("handler", new BlockServerHandler(dataProvider))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/dff01553/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
deleted file mode 100644
index 40dd5e5..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import java.io.FileInputStream
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-
-import io.netty.buffer.Unpooled
-import io.netty.channel._
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.{FileSegment, BlockDataProvider}
-
-
-/**
- * A handler that processes requests from clients and writes block data back.
- *
- * The messages should have been processed by a LineBasedFrameDecoder and a StringDecoder first
- * so channelRead0 is called once per line (i.e. per block id).
- */
-private[server]
-class BlockServerHandler(dataProvider: BlockDataProvider)
-  extends SimpleChannelInboundHandler[String] with Logging {
-
-  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
-    logError(s"Exception in connection from ${ctx.channel.remoteAddress}", cause)
-    ctx.close()
-  }
-
-  override def channelRead0(ctx: ChannelHandlerContext, blockId: String): Unit = {
-    def client = ctx.channel.remoteAddress.toString
-
-    // A helper function to send error message back to the client.
-    def respondWithError(error: String): Unit = {
-      ctx.writeAndFlush(new BlockHeader(-1, blockId, Some(error))).addListener(
-        new ChannelFutureListener {
-          override def operationComplete(future: ChannelFuture) {
-            if (!future.isSuccess) {
-              // TODO: Maybe log the success case as well.
-              logError(s"Error sending error back to $client", future.cause)
-              ctx.close()
-            }
-          }
-        }
-      )
-    }
-
-    def writeFileSegment(segment: FileSegment): Unit = {
-      // Send error message back if the block is too large. Even though we are capable of sending
-      // large (2G+) blocks, the receiving end cannot handle it so let's fail fast.
-      // Once we fixed the receiving end to be able to process large blocks, this should be removed.
-      // Also make sure we update BlockHeaderEncoder to support length > 2G.
-
-      // See [[BlockHeaderEncoder]] for the way length is encoded.
-      if (segment.length + blockId.length + 4 > Int.MaxValue) {
-        respondWithError(s"Block $blockId size ($segment.length) greater than 2G")
-        return
-      }
-
-      var fileChannel: FileChannel = null
-      try {
-        fileChannel = new FileInputStream(segment.file).getChannel
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Error opening channel for $blockId in ${segment.file} for request from $client", e)
-          respondWithError(e.getMessage)
-      }
-
-      // Found the block. Send it back.
-      if (fileChannel != null) {
-        // Write the header and block data. In the case of failures, the listener on the block data
-        // write should close the connection.
-        ctx.write(new BlockHeader(segment.length.toInt, blockId))
-
-        val region = new DefaultFileRegion(fileChannel, segment.offset, segment.length)
-        ctx.writeAndFlush(region).addListener(new ChannelFutureListener {
-          override def operationComplete(future: ChannelFuture) {
-            if (future.isSuccess) {
-              logTrace(s"Sent block $blockId (${segment.length} B) back to $client")
-            } else {
-              logError(s"Error sending block $blockId to $client; closing connection", future.cause)
-              ctx.close()
-            }
-          }
-        })
-      }
-    }
-
-    def writeByteBuffer(buf: ByteBuffer): Unit = {
-      ctx.write(new BlockHeader(buf.remaining, blockId))
-      ctx.writeAndFlush(Unpooled.wrappedBuffer(buf)).addListener(new ChannelFutureListener {
-        override def operationComplete(future: ChannelFuture) {
-          if (future.isSuccess) {
-            logTrace(s"Sent block $blockId (${buf.remaining} B) back to $client")
-          } else {
-            logError(s"Error sending block $blockId to $client; closing connection", future.cause)
-            ctx.close()
-          }
-        }
-      })
-    }
-
-    logTrace(s"Received request from $client to fetch block $blockId")
-
-    var blockData: Either[FileSegment, ByteBuffer] = null
-
-    // First make sure we can find the block. If not, send error back to the user.
-    try {
-      blockData = dataProvider.getBlockData(blockId)
-    } catch {
-      case e: Exception =>
-        logError(s"Error opening block $blockId for request from $client", e)
-        respondWithError(e.getMessage)
-        return
-    }
-
-    blockData match {
-      case Left(segment) => writeFileSegment(segment)
-      case Right(buf) => writeByteBuffer(buf)
-    }
-
-  }  // end of channelRead0
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org