You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:30 UTC

[14/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules into common/network-*

[SPARK-13529][BUILD] Move network/* modules into common/network-*

## What changes were proposed in this pull request?
As the title says, this moves the three modules currently in network/ into common/network-*. This removes one top level, non-user-facing folder.

## How was this patch tested?
Compilation and existing tests. We should run both SBT and Maven.

Author: Reynold Xin <rx...@databricks.com>

Closes #11409 from rxin/SPARK-13529.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e01dcc6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e01dcc6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e01dcc6

Branch: refs/heads/master
Commit: 9e01dcc6446f8648e61062f8afe62589b9d4b5ab
Parents: cca79fa
Author: Reynold Xin <rx...@databricks.com>
Authored: Sun Feb 28 17:25:07 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 28 17:25:07 2016 -0800

----------------------------------------------------------------------
 common/network-common/pom.xml                   | 103 ++++
 .../apache/spark/network/TransportContext.java  | 166 +++++++
 .../buffer/FileSegmentManagedBuffer.java        | 154 ++++++
 .../spark/network/buffer/LazyFileRegion.java    | 111 +++++
 .../spark/network/buffer/ManagedBuffer.java     |  75 +++
 .../network/buffer/NettyManagedBuffer.java      |  76 +++
 .../spark/network/buffer/NioManagedBuffer.java  |  75 +++
 .../client/ChunkFetchFailureException.java      |  31 ++
 .../network/client/ChunkReceivedCallback.java   |  47 ++
 .../network/client/RpcResponseCallback.java     |  32 ++
 .../spark/network/client/StreamCallback.java    |  40 ++
 .../spark/network/client/StreamInterceptor.java |  86 ++++
 .../spark/network/client/TransportClient.java   | 321 +++++++++++++
 .../client/TransportClientBootstrap.java        |  34 ++
 .../network/client/TransportClientFactory.java  | 264 ++++++++++
 .../client/TransportResponseHandler.java        | 251 ++++++++++
 .../spark/network/protocol/AbstractMessage.java |  54 +++
 .../protocol/AbstractResponseMessage.java       |  32 ++
 .../network/protocol/ChunkFetchFailure.java     |  76 +++
 .../network/protocol/ChunkFetchRequest.java     |  71 +++
 .../network/protocol/ChunkFetchSuccess.java     |  89 ++++
 .../spark/network/protocol/Encodable.java       |  41 ++
 .../apache/spark/network/protocol/Encoders.java |  92 ++++
 .../apache/spark/network/protocol/Message.java  |  73 +++
 .../spark/network/protocol/MessageDecoder.java  |  82 ++++
 .../spark/network/protocol/MessageEncoder.java  |  93 ++++
 .../network/protocol/MessageWithHeader.java     | 135 ++++++
 .../spark/network/protocol/OneWayMessage.java   |  80 ++++
 .../spark/network/protocol/RequestMessage.java  |  25 +
 .../spark/network/protocol/ResponseMessage.java |  25 +
 .../spark/network/protocol/RpcFailure.java      |  74 +++
 .../spark/network/protocol/RpcRequest.java      |  87 ++++
 .../spark/network/protocol/RpcResponse.java     |  87 ++++
 .../spark/network/protocol/StreamChunkId.java   |  73 +++
 .../spark/network/protocol/StreamFailure.java   |  80 ++++
 .../spark/network/protocol/StreamRequest.java   |  78 +++
 .../spark/network/protocol/StreamResponse.java  |  92 ++++
 .../spark/network/sasl/SaslClientBootstrap.java | 109 +++++
 .../spark/network/sasl/SaslEncryption.java      | 291 ++++++++++++
 .../network/sasl/SaslEncryptionBackend.java     |  33 ++
 .../apache/spark/network/sasl/SaslMessage.java  |  78 +++
 .../spark/network/sasl/SaslRpcHandler.java      | 158 ++++++
 .../spark/network/sasl/SaslServerBootstrap.java |  49 ++
 .../spark/network/sasl/SecretKeyHolder.java     |  35 ++
 .../spark/network/sasl/SparkSaslClient.java     | 162 +++++++
 .../spark/network/sasl/SparkSaslServer.java     | 200 ++++++++
 .../spark/network/server/MessageHandler.java    |  39 ++
 .../spark/network/server/NoOpRpcHandler.java    |  40 ++
 .../network/server/OneForOneStreamManager.java  | 143 ++++++
 .../apache/spark/network/server/RpcHandler.java | 100 ++++
 .../spark/network/server/StreamManager.java     |  86 ++++
 .../network/server/TransportChannelHandler.java | 163 +++++++
 .../network/server/TransportRequestHandler.java | 209 ++++++++
 .../spark/network/server/TransportServer.java   | 151 ++++++
 .../server/TransportServerBootstrap.java        |  36 ++
 .../network/util/ByteArrayWritableChannel.java  |  69 +++
 .../org/apache/spark/network/util/ByteUnit.java |  67 +++
 .../spark/network/util/ConfigProvider.java      |  52 ++
 .../org/apache/spark/network/util/IOMode.java   |  27 ++
 .../apache/spark/network/util/JavaUtils.java    | 303 ++++++++++++
 .../spark/network/util/LimitedInputStream.java  | 105 ++++
 .../spark/network/util/MapConfigProvider.java   |  41 ++
 .../apache/spark/network/util/NettyUtils.java   | 139 ++++++
 .../util/SystemPropertyConfigProvider.java      |  34 ++
 .../spark/network/util/TransportConf.java       | 169 +++++++
 .../network/util/TransportFrameDecoder.java     | 227 +++++++++
 .../network/ChunkFetchIntegrationSuite.java     | 244 ++++++++++
 .../org/apache/spark/network/ProtocolSuite.java | 127 +++++
 .../network/RequestTimeoutIntegrationSuite.java | 288 +++++++++++
 .../spark/network/RpcIntegrationSuite.java      | 215 +++++++++
 .../org/apache/spark/network/StreamSuite.java   | 349 ++++++++++++++
 .../apache/spark/network/TestManagedBuffer.java | 109 +++++
 .../org/apache/spark/network/TestUtils.java     |  30 ++
 .../network/TransportClientFactorySuite.java    | 214 +++++++++
 .../network/TransportResponseHandlerSuite.java  | 146 ++++++
 .../protocol/MessageWithHeaderSuite.java        | 157 ++++++
 .../spark/network/sasl/SparkSaslSuite.java      | 476 +++++++++++++++++++
 .../server/OneForOneStreamManagerSuite.java     |  50 ++
 .../util/TransportFrameDecoderSuite.java        | 258 ++++++++++
 .../src/test/resources/log4j.properties         |  27 ++
 common/network-shuffle/pom.xml                  | 101 ++++
 .../network/sasl/ShuffleSecretManager.java      |  97 ++++
 .../network/shuffle/BlockFetchingListener.java  |  36 ++
 .../shuffle/ExternalShuffleBlockHandler.java    | 140 ++++++
 .../shuffle/ExternalShuffleBlockResolver.java   | 449 +++++++++++++++++
 .../network/shuffle/ExternalShuffleClient.java  | 154 ++++++
 .../network/shuffle/OneForOneBlockFetcher.java  | 129 +++++
 .../network/shuffle/RetryingBlockFetcher.java   | 234 +++++++++
 .../spark/network/shuffle/ShuffleClient.java    |  44 ++
 .../mesos/MesosExternalShuffleClient.java       |  73 +++
 .../shuffle/protocol/BlockTransferMessage.java  |  81 ++++
 .../shuffle/protocol/ExecutorShuffleInfo.java   |  94 ++++
 .../network/shuffle/protocol/OpenBlocks.java    |  90 ++++
 .../shuffle/protocol/RegisterExecutor.java      |  94 ++++
 .../network/shuffle/protocol/StreamHandle.java  |  81 ++++
 .../network/shuffle/protocol/UploadBlock.java   | 117 +++++
 .../shuffle/protocol/mesos/RegisterDriver.java  |  63 +++
 .../network/sasl/SaslIntegrationSuite.java      | 294 ++++++++++++
 .../shuffle/BlockTransferMessagesSuite.java     |  44 ++
 .../ExternalShuffleBlockHandlerSuite.java       | 127 +++++
 .../ExternalShuffleBlockResolverSuite.java      | 156 ++++++
 .../shuffle/ExternalShuffleCleanupSuite.java    | 149 ++++++
 .../ExternalShuffleIntegrationSuite.java        | 301 ++++++++++++
 .../shuffle/ExternalShuffleSecuritySuite.java   | 124 +++++
 .../shuffle/OneForOneBlockFetcherSuite.java     | 176 +++++++
 .../shuffle/RetryingBlockFetcherSuite.java      | 313 ++++++++++++
 .../network/shuffle/TestShuffleDataContext.java | 117 +++++
 common/network-yarn/pom.xml                     | 148 ++++++
 .../spark/network/yarn/YarnShuffleService.java  | 224 +++++++++
 .../network/yarn/util/HadoopConfigProvider.java |  42 ++
 dev/sparktestsupport/modules.py                 |   4 +-
 docs/job-scheduling.md                          |   2 +-
 .../spark/launcher/AbstractCommandBuilder.java  |   3 +-
 make-distribution.sh                            |   2 +-
 network/common/pom.xml                          | 103 ----
 .../apache/spark/network/TransportContext.java  | 166 -------
 .../buffer/FileSegmentManagedBuffer.java        | 154 ------
 .../spark/network/buffer/LazyFileRegion.java    | 111 -----
 .../spark/network/buffer/ManagedBuffer.java     |  75 ---
 .../network/buffer/NettyManagedBuffer.java      |  76 ---
 .../spark/network/buffer/NioManagedBuffer.java  |  75 ---
 .../client/ChunkFetchFailureException.java      |  31 --
 .../network/client/ChunkReceivedCallback.java   |  47 --
 .../network/client/RpcResponseCallback.java     |  32 --
 .../spark/network/client/StreamCallback.java    |  40 --
 .../spark/network/client/StreamInterceptor.java |  86 ----
 .../spark/network/client/TransportClient.java   | 321 -------------
 .../client/TransportClientBootstrap.java        |  34 --
 .../network/client/TransportClientFactory.java  | 264 ----------
 .../client/TransportResponseHandler.java        | 251 ----------
 .../spark/network/protocol/AbstractMessage.java |  54 ---
 .../protocol/AbstractResponseMessage.java       |  32 --
 .../network/protocol/ChunkFetchFailure.java     |  76 ---
 .../network/protocol/ChunkFetchRequest.java     |  71 ---
 .../network/protocol/ChunkFetchSuccess.java     |  89 ----
 .../spark/network/protocol/Encodable.java       |  41 --
 .../apache/spark/network/protocol/Encoders.java |  92 ----
 .../apache/spark/network/protocol/Message.java  |  73 ---
 .../spark/network/protocol/MessageDecoder.java  |  82 ----
 .../spark/network/protocol/MessageEncoder.java  |  93 ----
 .../network/protocol/MessageWithHeader.java     | 135 ------
 .../spark/network/protocol/OneWayMessage.java   |  80 ----
 .../spark/network/protocol/RequestMessage.java  |  25 -
 .../spark/network/protocol/ResponseMessage.java |  25 -
 .../spark/network/protocol/RpcFailure.java      |  74 ---
 .../spark/network/protocol/RpcRequest.java      |  87 ----
 .../spark/network/protocol/RpcResponse.java     |  87 ----
 .../spark/network/protocol/StreamChunkId.java   |  73 ---
 .../spark/network/protocol/StreamFailure.java   |  80 ----
 .../spark/network/protocol/StreamRequest.java   |  78 ---
 .../spark/network/protocol/StreamResponse.java  |  92 ----
 .../spark/network/sasl/SaslClientBootstrap.java | 109 -----
 .../spark/network/sasl/SaslEncryption.java      | 291 ------------
 .../network/sasl/SaslEncryptionBackend.java     |  33 --
 .../apache/spark/network/sasl/SaslMessage.java  |  78 ---
 .../spark/network/sasl/SaslRpcHandler.java      | 158 ------
 .../spark/network/sasl/SaslServerBootstrap.java |  49 --
 .../spark/network/sasl/SecretKeyHolder.java     |  35 --
 .../spark/network/sasl/SparkSaslClient.java     | 162 -------
 .../spark/network/sasl/SparkSaslServer.java     | 200 --------
 .../spark/network/server/MessageHandler.java    |  39 --
 .../spark/network/server/NoOpRpcHandler.java    |  40 --
 .../network/server/OneForOneStreamManager.java  | 143 ------
 .../apache/spark/network/server/RpcHandler.java | 100 ----
 .../spark/network/server/StreamManager.java     |  86 ----
 .../network/server/TransportChannelHandler.java | 163 -------
 .../network/server/TransportRequestHandler.java | 209 --------
 .../spark/network/server/TransportServer.java   | 151 ------
 .../server/TransportServerBootstrap.java        |  36 --
 .../network/util/ByteArrayWritableChannel.java  |  69 ---
 .../org/apache/spark/network/util/ByteUnit.java |  67 ---
 .../spark/network/util/ConfigProvider.java      |  52 --
 .../org/apache/spark/network/util/IOMode.java   |  27 --
 .../apache/spark/network/util/JavaUtils.java    | 303 ------------
 .../spark/network/util/LimitedInputStream.java  | 105 ----
 .../spark/network/util/MapConfigProvider.java   |  41 --
 .../apache/spark/network/util/NettyUtils.java   | 139 ------
 .../util/SystemPropertyConfigProvider.java      |  34 --
 .../spark/network/util/TransportConf.java       | 169 -------
 .../network/util/TransportFrameDecoder.java     | 227 ---------
 .../network/ChunkFetchIntegrationSuite.java     | 244 ----------
 .../org/apache/spark/network/ProtocolSuite.java | 127 -----
 .../network/RequestTimeoutIntegrationSuite.java | 288 -----------
 .../spark/network/RpcIntegrationSuite.java      | 215 ---------
 .../org/apache/spark/network/StreamSuite.java   | 349 --------------
 .../apache/spark/network/TestManagedBuffer.java | 109 -----
 .../org/apache/spark/network/TestUtils.java     |  30 --
 .../network/TransportClientFactorySuite.java    | 214 ---------
 .../network/TransportResponseHandlerSuite.java  | 146 ------
 .../protocol/MessageWithHeaderSuite.java        | 157 ------
 .../spark/network/sasl/SparkSaslSuite.java      | 476 -------------------
 .../server/OneForOneStreamManagerSuite.java     |  50 --
 .../util/TransportFrameDecoderSuite.java        | 258 ----------
 .../common/src/test/resources/log4j.properties  |  27 --
 network/shuffle/pom.xml                         | 101 ----
 .../network/sasl/ShuffleSecretManager.java      |  97 ----
 .../network/shuffle/BlockFetchingListener.java  |  36 --
 .../shuffle/ExternalShuffleBlockHandler.java    | 140 ------
 .../shuffle/ExternalShuffleBlockResolver.java   | 449 -----------------
 .../network/shuffle/ExternalShuffleClient.java  | 154 ------
 .../network/shuffle/OneForOneBlockFetcher.java  | 129 -----
 .../network/shuffle/RetryingBlockFetcher.java   | 234 ---------
 .../spark/network/shuffle/ShuffleClient.java    |  44 --
 .../mesos/MesosExternalShuffleClient.java       |  73 ---
 .../shuffle/protocol/BlockTransferMessage.java  |  81 ----
 .../shuffle/protocol/ExecutorShuffleInfo.java   |  94 ----
 .../network/shuffle/protocol/OpenBlocks.java    |  90 ----
 .../shuffle/protocol/RegisterExecutor.java      |  94 ----
 .../network/shuffle/protocol/StreamHandle.java  |  81 ----
 .../network/shuffle/protocol/UploadBlock.java   | 117 -----
 .../shuffle/protocol/mesos/RegisterDriver.java  |  63 ---
 .../network/sasl/SaslIntegrationSuite.java      | 294 ------------
 .../shuffle/BlockTransferMessagesSuite.java     |  44 --
 .../ExternalShuffleBlockHandlerSuite.java       | 127 -----
 .../ExternalShuffleBlockResolverSuite.java      | 156 ------
 .../shuffle/ExternalShuffleCleanupSuite.java    | 149 ------
 .../ExternalShuffleIntegrationSuite.java        | 301 ------------
 .../shuffle/ExternalShuffleSecuritySuite.java   | 124 -----
 .../shuffle/OneForOneBlockFetcherSuite.java     | 176 -------
 .../shuffle/RetryingBlockFetcherSuite.java      | 313 ------------
 .../network/shuffle/TestShuffleDataContext.java | 117 -----
 network/yarn/pom.xml                            | 148 ------
 .../spark/network/yarn/YarnShuffleService.java  | 224 ---------
 .../network/yarn/util/HadoopConfigProvider.java |  42 --
 pom.xml                                         |   6 +-
 225 files changed, 13711 insertions(+), 13710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
new file mode 100644
index 0000000..bd507c2
--- /dev/null
+++ b/common/network-common/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-network-common_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Networking</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>network-common</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <!-- Core dependencies -->
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+    </dependency>
+
+    <!-- Provided dependencies -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <!-- Create a test-jar so network-shuffle can depend on our test utilities. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>test-jar-on-test-compile</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
new file mode 100644
index 0000000..238710d
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.client.TransportResponseHandler;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.MessageEncoder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
+ * setup Netty Channel pipelines with a {@link org.apache.spark.network.server.TransportChannelHandler}.
+ *
+ * There are two communication protocols that the TransportClient provides, control-plane RPCs and
+ * data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
+ * TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams
+ * which can be streamed through the data plane in chunks using zero-copy IO.
+ *
+ * The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
+ * channel. As each TransportChannelHandler contains a TransportClient, this enables server
+ * processes to send messages back to the client on an existing channel.
+ */
+public class TransportContext {
+  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
+
+  private final TransportConf conf;
+  private final RpcHandler rpcHandler;
+  private final boolean closeIdleConnections;
+
+  private final MessageEncoder encoder;
+  private final MessageDecoder decoder;
+
+  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
+    this(conf, rpcHandler, false);
+  }
+
+  public TransportContext(
+      TransportConf conf,
+      RpcHandler rpcHandler,
+      boolean closeIdleConnections) {
+    this.conf = conf;
+    this.rpcHandler = rpcHandler;
+    this.encoder = new MessageEncoder();
+    this.decoder = new MessageDecoder();
+    this.closeIdleConnections = closeIdleConnections;
+  }
+
+  /**
+   * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
+   * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
+   * to create a Client.
+   */
+  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
+    return new TransportClientFactory(this, bootstraps);
+  }
+
+  public TransportClientFactory createClientFactory() {
+    return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
+  }
+
+  /** Create a server which will attempt to bind to a specific port. */
+  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
+    return new TransportServer(this, null, port, rpcHandler, bootstraps);
+  }
+
+  /** Create a server which will attempt to bind to a specific host and port. */
+  public TransportServer createServer(
+      String host, int port, List<TransportServerBootstrap> bootstraps) {
+    return new TransportServer(this, host, port, rpcHandler, bootstraps);
+  }
+
+  /** Creates a new server, binding to any available ephemeral port. */
+  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
+    return createServer(0, bootstraps);
+  }
+
+  public TransportServer createServer() {
+    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
+  }
+
+  public TransportChannelHandler initializePipeline(SocketChannel channel) {
+    return initializePipeline(channel, rpcHandler);
+  }
+
+  /**
+   * Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
+   * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
+   * response messages.
+   *
+   * @param channel The channel to initialize.
+   * @param channelRpcHandler The RPC handler to use for the channel.
+   *
+   * @return Returns the created TransportChannelHandler, which includes a TransportClient that can
+   * be used to communicate on this channel. The TransportClient is directly associated with a
+   * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
+   */
+  public TransportChannelHandler initializePipeline(
+      SocketChannel channel,
+      RpcHandler channelRpcHandler) {
+    try {
+      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
+      channel.pipeline()
+        .addLast("encoder", encoder)
+        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
+        .addLast("decoder", decoder)
+        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
+        // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
+        // would require more logic to guarantee if this were not part of the same event loop.
+        .addLast("handler", channelHandler);
+      return channelHandler;
+    } catch (RuntimeException e) {
+      logger.error("Error while initializing Netty pipeline", e);
+      throw e;
+    }
+  }
+
+  /**
+   * Creates the server- and client-side handler which is used to handle both RequestMessages and
+   * ResponseMessages. The channel is expected to have been successfully created, though certain
+   * properties (such as the remoteAddress()) may not be available yet.
+   */
+  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
+    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
+    TransportClient client = new TransportClient(channel, responseHandler);
+    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
+      rpcHandler);
+    return new TransportChannelHandler(client, responseHandler, requestHandler,
+      conf.connectionTimeoutMs(), closeIdleConnections);
+  }
+
+  public TransportConf getConf() { return conf; }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
new file mode 100644
index 0000000..844eff4
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import io.netty.channel.DefaultFileRegion;
+
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A {@link ManagedBuffer} backed by a segment in a file.
+ */
+public final class FileSegmentManagedBuffer extends ManagedBuffer {
+  private final TransportConf conf;
+  private final File file;
+  private final long offset;
+  private final long length;
+
+  public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
+    this.conf = conf;
+    this.file = file;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  @Override
+  public long size() {
+    return length;
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() throws IOException {
+    FileChannel channel = null;
+    try {
+      channel = new RandomAccessFile(file, "r").getChannel();
+      // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
+      if (length < conf.memoryMapBytes()) {
+        ByteBuffer buf = ByteBuffer.allocate((int) length);
+        channel.position(offset);
+        while (buf.remaining() != 0) {
+          if (channel.read(buf) == -1) {
+            throw new IOException(String.format("Reached EOF before filling buffer\n" +
+              "offset=%s\nfile=%s\nbuf.remaining=%s",
+              offset, file.getAbsoluteFile(), buf.remaining()));
+          }
+        }
+        buf.flip();
+        return buf;
+      } else {
+        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
+      }
+    } catch (IOException e) {
+      try {
+        if (channel != null) {
+          long size = channel.size();
+          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
+            e);
+        }
+      } catch (IOException ignored) {
+        // ignore
+      }
+      throw new IOException("Error in opening " + this, e);
+    } finally {
+      JavaUtils.closeQuietly(channel);
+    }
+  }
+
+  @Override
+  public InputStream createInputStream() throws IOException {
+    FileInputStream is = null;
+    try {
+      is = new FileInputStream(file);
+      ByteStreams.skipFully(is, offset);
+      return new LimitedInputStream(is, length);
+    } catch (IOException e) {
+      try {
+        if (is != null) {
+          long size = file.length();
+          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
+              e);
+        }
+      } catch (IOException ignored) {
+        // ignore
+      } finally {
+        JavaUtils.closeQuietly(is);
+      }
+      throw new IOException("Error in opening " + this, e);
+    } catch (RuntimeException e) {
+      JavaUtils.closeQuietly(is);
+      throw e;
+    }
+  }
+
+  @Override
+  public ManagedBuffer retain() {
+    return this;
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() throws IOException {
+    if (conf.lazyFileDescriptor()) {
+      return new LazyFileRegion(file, offset, length);
+    } else {
+      FileChannel fileChannel = new FileInputStream(file).getChannel();
+      return new DefaultFileRegion(fileChannel, offset, length);
+    }
+  }
+
+  public File getFile() { return file; }
+
+  public long getOffset() { return offset; }
+
+  public long getLength() { return length; }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("file", file)
+      .add("offset", offset)
+      .add("length", length)
+      .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
new file mode 100644
index 0000000..162cf6d
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Objects;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A FileRegion implementation that only creates the file descriptor when the region is being
+ * transferred. This cannot be used with Epoll because there is no native support for it.
+ *
+ * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
+ * should push this into Netty so the native Epoll transport can support this feature.
+ */
+public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
+
+  private final File file;
+  private final long position;
+  private final long count;
+
+  private FileChannel channel;
+
+  private long numBytesTransferred = 0L;
+
+  /**
+   * @param file file to transfer.
+   * @param position start position for the transfer.
+   * @param count number of bytes to transfer starting from position.
+   */
+  public LazyFileRegion(File file, long position, long count) {
+    this.file = file;
+    this.position = position;
+    this.count = count;
+  }
+
+  @Override
+  protected void deallocate() {
+    JavaUtils.closeQuietly(channel);
+  }
+
+  @Override
+  public long position() {
+    return position;
+  }
+
+  @Override
+  public long transfered() {
+    return numBytesTransferred;
+  }
+
+  @Override
+  public long count() {
+    return count;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position) throws IOException {
+    if (channel == null) {
+      channel = new FileInputStream(file).getChannel();
+    }
+
+    long count = this.count - position;
+    if (count < 0 || position < 0) {
+      throw new IllegalArgumentException(
+          "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
+    }
+
+    if (count == 0) {
+      return 0L;
+    }
+
+    long written = channel.transferTo(this.position + position, count, target);
+    if (written > 0) {
+      numBytesTransferred += written;
+    }
+    return written;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("file", file)
+        .add("position", position)
+        .add("count", count)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
new file mode 100644
index 0000000..1861f8d
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provides an immutable view for data in the form of bytes. The implementation
+ * should specify how the data is provided:
+ *
+ * - {@link FileSegmentManagedBuffer}: data backed by part of a file
+ * - {@link NioManagedBuffer}: data backed by a NIO ByteBuffer
+ * - {@link NettyManagedBuffer}: data backed by a Netty ByteBuf
+ *
+ * The concrete buffer implementation might be managed outside the JVM garbage collector.
+ * For example, in the case of {@link NettyManagedBuffer}, the buffers are reference counted.
+ * In that case, if the buffer is going to be passed around to a different thread, retain/release
+ * should be called.
+ */
+public abstract class ManagedBuffer {
+
+  /** Number of bytes of the data. */
+  public abstract long size();
+
+  /**
+   * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
+   * returned ByteBuffer should not affect the content of this buffer.
+   */
+  // TODO: Deprecate this, usage may require expensive memory mapping or allocation.
+  public abstract ByteBuffer nioByteBuffer() throws IOException;
+
+  /**
+   * Exposes this buffer's data as an InputStream. The underlying implementation does not
+   * necessarily check for the length of bytes read, so the caller is responsible for making sure
+   * it does not go over the limit.
+   */
+  public abstract InputStream createInputStream() throws IOException;
+
+  /**
+   * Increment the reference count by one if applicable.
+   */
+  public abstract ManagedBuffer retain();
+
+  /**
+   * If applicable, decrement the reference count by one and deallocates the buffer if the
+   * reference count reaches zero.
+   */
+  public abstract ManagedBuffer release();
+
+  /**
+   * Convert the buffer into an Netty object, used to write the data out. The return value is either
+   * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}.
+   *
+   * If this method returns a ByteBuf, then that buffer's reference count will be incremented and
+   * the caller will be responsible for releasing this new reference.
+   */
+  public abstract Object convertToNetty() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
new file mode 100644
index 0000000..4c8802a
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+/**
+ * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
+ */
+public final class NettyManagedBuffer extends ManagedBuffer {
+  private final ByteBuf buf;
+
+  public NettyManagedBuffer(ByteBuf buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public long size() {
+    return buf.readableBytes();
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() throws IOException {
+    return buf.nioBuffer();
+  }
+
+  @Override
+  public InputStream createInputStream() throws IOException {
+    return new ByteBufInputStream(buf);
+  }
+
+  @Override
+  public ManagedBuffer retain() {
+    buf.retain();
+    return this;
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    buf.release();
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() throws IOException {
+    return buf.duplicate().retain();
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("buf", buf)
+      .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
new file mode 100644
index 0000000..631d767
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
+
+/**
+ * A {@link ManagedBuffer} backed by {@link ByteBuffer}.
+ */
+public class NioManagedBuffer extends ManagedBuffer {
+  private final ByteBuffer buf;
+
+  public NioManagedBuffer(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public long size() {
+    return buf.remaining();
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() throws IOException {
+    return buf.duplicate();
+  }
+
+  @Override
+  public InputStream createInputStream() throws IOException {
+    return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));
+  }
+
+  @Override
+  public ManagedBuffer retain() {
+    return this;
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() throws IOException {
+    return Unpooled.wrappedBuffer(buf);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("buf", buf)
+      .toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
new file mode 100644
index 0000000..1fbdcd6
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * General exception caused by a remote exception while fetching a chunk.
+ */
+public class ChunkFetchFailureException extends RuntimeException {
+  public ChunkFetchFailureException(String errorMsg, Throwable cause) {
+    super(errorMsg, cause);
+  }
+
+  public ChunkFetchFailureException(String errorMsg) {
+    super(errorMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
new file mode 100644
index 0000000..519e6cb
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * Callback for the result of a single chunk result. For a single stream, the callbacks are
+ * guaranteed to be called by the same thread in the same order as the requests for chunks were
+ * made.
+ *
+ * Note that if a general stream failure occurs, all outstanding chunk requests may be failed.
+ */
+public interface ChunkReceivedCallback {
+  /**
+   * Called upon receipt of a particular chunk.
+   *
+   * The given buffer will initially have a refcount of 1, but will be release()'d as soon as this
+   * call returns. You must therefore either retain() the buffer or copy its contents before
+   * returning.
+   */
+  void onSuccess(int chunkIndex, ManagedBuffer buffer);
+
+  /**
+   * Called upon failure to fetch a particular chunk. Note that this may actually be called due
+   * to failure to fetch a prior chunk in this stream.
+   *
+   * After receiving a failure, the stream may or may not be valid. The client should not assume
+   * that the server's side of the stream has been closed.
+   */
+  void onFailure(int chunkIndex, Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
new file mode 100644
index 0000000..47e93f9
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Callback for the result of a single RPC. This will be invoked once with either success or
+ * failure.
+ */
+public interface RpcResponseCallback {
+  /** Successful serialized result from server. */
+  void onSuccess(ByteBuffer response);
+
+  /** Exception either propagated from server or raised on client side. */
+  void onFailure(Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java
new file mode 100644
index 0000000..29e6a30
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Callback for streaming data. Stream data will be offered to the {@link #onData(String, ByteBuffer)}
+ * method as it arrives. Once all the stream data is received, {@link #onComplete(String)} will be
+ * called.
+ * <p>
+ * The network library guarantees that a single thread will call these methods at a time, but
+ * different call may be made by different threads.
+ */
+public interface StreamCallback {
+  /** Called upon receipt of stream data. */
+  void onData(String streamId, ByteBuffer buf) throws IOException;
+
+  /** Called when all data from the stream has been received. */
+  void onComplete(String streamId) throws IOException;
+
+  /** Called if there's an error reading data from the stream. */
+  void onFailure(String streamId, Throwable cause) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
new file mode 100644
index 0000000..88ba3cc
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * An interceptor that is registered with the frame decoder to feed stream data to a
+ * callback.
+ */
+class StreamInterceptor implements TransportFrameDecoder.Interceptor {
+
+  private final TransportResponseHandler handler;
+  private final String streamId;
+  private final long byteCount;
+  private final StreamCallback callback;
+
+  private volatile long bytesRead;
+
+  StreamInterceptor(
+      TransportResponseHandler handler,
+      String streamId,
+      long byteCount,
+      StreamCallback callback) {
+    this.handler = handler;
+    this.streamId = streamId;
+    this.byteCount = byteCount;
+    this.callback = callback;
+    this.bytesRead = 0;
+  }
+
+  @Override
+  public void exceptionCaught(Throwable cause) throws Exception {
+    handler.deactivateStream();
+    callback.onFailure(streamId, cause);
+  }
+
+  @Override
+  public void channelInactive() throws Exception {
+    handler.deactivateStream();
+    callback.onFailure(streamId, new ClosedChannelException());
+  }
+
+  @Override
+  public boolean handle(ByteBuf buf) throws Exception {
+    int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
+    ByteBuffer nioBuffer = buf.readSlice(toRead).nioBuffer();
+
+    int available = nioBuffer.remaining();
+    callback.onData(streamId, nioBuffer);
+    bytesRead += available;
+    if (bytesRead > byteCount) {
+      RuntimeException re = new IllegalStateException(String.format(
+        "Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
+      callback.onFailure(streamId, re);
+      handler.deactivateStream();
+      throw re;
+    } else if (bytesRead == byteCount) {
+      handler.deactivateStream();
+      callback.onComplete(streamId);
+    }
+
+    return bytesRead != byteCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
new file mode 100644
index 0000000..e15f096
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.OneWayMessage;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.protocol.StreamChunkId;
+import org.apache.spark.network.protocol.StreamRequest;
+import org.apache.spark.network.util.NettyUtils;
+
+/**
+ * Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
+ * efficient transfer of a large amount of data, broken up into chunks with size ranging from
+ * hundreds of KB to a few MB.
+ *
+ * Note that while this client deals with the fetching of chunks from a stream (i.e., data plane),
+ * the actual setup of the streams is done outside the scope of the transport layer. The convenience
+ * method "sendRPC" is provided to enable control plane communication between the client and server
+ * to perform this setup.
+ *
+ * For example, a typical workflow might be:
+ * client.sendRPC(new OpenFile("/foo")) --&gt; returns StreamId = 100
+ * client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
+ * client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
+ * ...
+ * client.sendRPC(new CloseStream(100))
+ *
+ * Construct an instance of TransportClient using {@link TransportClientFactory}. A single
+ * TransportClient may be used for multiple streams, but any given stream must be restricted to a
+ * single client, in order to avoid out-of-order responses.
+ *
+ * NB: This class is used to make requests to the server, while {@link TransportResponseHandler} is
+ * responsible for handling responses from the server.
+ *
+ * Concurrency: thread safe and can be called from multiple threads.
+ */
+public class TransportClient implements Closeable {
+  private final Logger logger = LoggerFactory.getLogger(TransportClient.class);
+
+  private final Channel channel;
+  private final TransportResponseHandler handler;
+  @Nullable private String clientId;
+  private volatile boolean timedOut;
+
+  public TransportClient(Channel channel, TransportResponseHandler handler) {
+    this.channel = Preconditions.checkNotNull(channel);
+    this.handler = Preconditions.checkNotNull(handler);
+    this.timedOut = false;
+  }
+
+  public Channel getChannel() {
+    return channel;
+  }
+
+  public boolean isActive() {
+    return !timedOut && (channel.isOpen() || channel.isActive());
+  }
+
+  public SocketAddress getSocketAddress() {
+    return channel.remoteAddress();
+  }
+
+  /**
+   * Returns the ID used by the client to authenticate itself when authentication is enabled.
+   *
+   * @return The client ID, or null if authentication is disabled.
+   */
+  public String getClientId() {
+    return clientId;
+  }
+
+  /**
+   * Sets the authenticated client ID. This is meant to be used by the authentication layer.
+   *
+   * Trying to set a different client ID after it's been set will result in an exception.
+   */
+  public void setClientId(String id) {
+    Preconditions.checkState(clientId == null, "Client ID has already been set.");
+    this.clientId = id;
+  }
+
+  /**
+   * Requests a single chunk from the remote side, from the pre-negotiated streamId.
+   *
+   * Chunk indices go from 0 onwards. It is valid to request the same chunk multiple times, though
+   * some streams may not support this.
+   *
+   * Multiple fetchChunk requests may be outstanding simultaneously, and the chunks are guaranteed
+   * to be returned in the same order that they were requested, assuming only a single
+   * TransportClient is used to fetch the chunks.
+   *
+   * @param streamId Identifier that refers to a stream in the remote StreamManager. This should
+   *                 be agreed upon by client and server beforehand.
+   * @param chunkIndex 0-based index of the chunk to fetch
+   * @param callback Callback invoked upon successful receipt of chunk, or upon any failure.
+   */
+  public void fetchChunk(
+      long streamId,
+      final int chunkIndex,
+      final ChunkReceivedCallback callback) {
+    final String serverAddr = NettyUtils.getRemoteAddress(channel);
+    final long startTime = System.currentTimeMillis();
+    logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
+
+    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
+    handler.addFetchRequest(streamChunkId, callback);
+
+    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
+      new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            long timeTaken = System.currentTimeMillis() - startTime;
+            logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
+              timeTaken);
+          } else {
+            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
+              serverAddr, future.cause());
+            logger.error(errorMsg, future.cause());
+            handler.removeFetchRequest(streamChunkId);
+            channel.close();
+            try {
+              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
+            } catch (Exception e) {
+              logger.error("Uncaught exception in RPC response callback handler!", e);
+            }
+          }
+        }
+      });
+  }
+
+  /**
+   * Request to stream the data with the given stream ID from the remote end.
+   *
+   * @param streamId The stream to fetch.
+   * @param callback Object to call with the stream data.
+   */
+  public void stream(final String streamId, final StreamCallback callback) {
+    final String serverAddr = NettyUtils.getRemoteAddress(channel);
+    final long startTime = System.currentTimeMillis();
+    logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
+
+    // Need to synchronize here so that the callback is added to the queue and the RPC is
+    // written to the socket atomically, so that callbacks are called in the right order
+    // when responses arrive.
+    synchronized (this) {
+      handler.addStreamCallback(callback);
+      channel.writeAndFlush(new StreamRequest(streamId)).addListener(
+        new ChannelFutureListener() {
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            if (future.isSuccess()) {
+              long timeTaken = System.currentTimeMillis() - startTime;
+              logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
+                timeTaken);
+            } else {
+              String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
+                serverAddr, future.cause());
+              logger.error(errorMsg, future.cause());
+              channel.close();
+              try {
+                callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
+              } catch (Exception e) {
+                logger.error("Uncaught exception in RPC response callback handler!", e);
+              }
+            }
+          }
+        });
+    }
+  }
+
+  /**
+   * Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked
+   * with the server's response or upon any failure.
+   *
+   * @param message The message to send.
+   * @param callback Callback to handle the RPC's reply.
+   * @return The RPC's id.
+   */
+  public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
+    final String serverAddr = NettyUtils.getRemoteAddress(channel);
+    final long startTime = System.currentTimeMillis();
+    logger.trace("Sending RPC to {}", serverAddr);
+
+    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+    handler.addRpcRequest(requestId, callback);
+
+    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
+      new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            long timeTaken = System.currentTimeMillis() - startTime;
+            logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
+          } else {
+            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
+              serverAddr, future.cause());
+            logger.error(errorMsg, future.cause());
+            handler.removeRpcRequest(requestId);
+            channel.close();
+            try {
+              callback.onFailure(new IOException(errorMsg, future.cause()));
+            } catch (Exception e) {
+              logger.error("Uncaught exception in RPC response callback handler!", e);
+            }
+          }
+        }
+      });
+
+    return requestId;
+  }
+
+  /**
+   * Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
+   * a specified timeout for a response.
+   */
+  public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
+    final SettableFuture<ByteBuffer> result = SettableFuture.create();
+
+    sendRpc(message, new RpcResponseCallback() {
+      @Override
+      public void onSuccess(ByteBuffer response) {
+        result.set(response);
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        result.setException(e);
+      }
+    });
+
+    try {
+      return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (ExecutionException e) {
+      throw Throwables.propagate(e.getCause());
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
+   * Sends an opaque message to the RpcHandler on the server-side. No reply is expected for the
+   * message, and no delivery guarantees are made.
+   *
+   * @param message The message to send.
+   */
+  public void send(ByteBuffer message) {
+    channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
+  }
+
+  /**
+   * Removes any state associated with the given RPC.
+   *
+   * @param requestId The RPC id returned by {@link #sendRpc(ByteBuffer, RpcResponseCallback)}.
+   */
+  public void removeRpcRequest(long requestId) {
+    handler.removeRpcRequest(requestId);
+  }
+
+  /** Mark this channel as having timed out. */
+  public void timeOut() {
+    this.timedOut = true;
+  }
+
+  @VisibleForTesting
+  public TransportResponseHandler getHandler() {
+    return handler;
+  }
+
+  @Override
+  public void close() {
+    // close is a local operation and should finish with milliseconds; timeout just to be safe
+    channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("remoteAdress", channel.remoteAddress())
+      .add("clientId", clientId)
+      .add("isActive", isActive())
+      .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
new file mode 100644
index 0000000..eaae2ee
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import io.netty.channel.Channel;
+
+/**
+ * A bootstrap which is executed on a TransportClient before it is returned to the user.
+ * This enables an initial exchange of information (e.g., SASL authentication tokens) on a once-per-
+ * connection basis.
+ *
+ * Since connections (and TransportClients) are reused as much as possible, it is generally
+ * reasonable to perform an expensive bootstrapping operation, as they often share a lifespan with
+ * the JVM itself.
+ */
+public interface TransportClientBootstrap {
+  /** Performs the bootstrapping operation, throwing an exception on failure. */
+  void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
new file mode 100644
index 0000000..61bafc8
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Factory for creating {@link TransportClient}s by using createClient.
+ *
+ * The factory maintains a connection pool to other hosts and should return the same
+ * TransportClient for the same remote host. It also shares a single worker thread pool for
+ * all TransportClients.
+ *
+ * TransportClients will be reused whenever possible. Prior to completing the creation of a new
+ * TransportClient, all given {@link TransportClientBootstrap}s will be run.
+ */
+public class TransportClientFactory implements Closeable {
+
+  /** A simple data structure to track the pool of clients between two peer nodes. */
+  private static class ClientPool {
+    TransportClient[] clients;
+    Object[] locks;
+
+    public ClientPool(int size) {
+      clients = new TransportClient[size];
+      locks = new Object[size];
+      for (int i = 0; i < size; i++) {
+        locks[i] = new Object();
+      }
+    }
+  }
+
+  private final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class);
+
+  private final TransportContext context;
+  private final TransportConf conf;
+  private final List<TransportClientBootstrap> clientBootstraps;
+  private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
+
+  /** Random number generator for picking connections between peers. */
+  private final Random rand;
+  private final int numConnectionsPerPeer;
+
+  private final Class<? extends Channel> socketChannelClass;
+  private EventLoopGroup workerGroup;
+  private PooledByteBufAllocator pooledAllocator;
+
+  public TransportClientFactory(
+      TransportContext context,
+      List<TransportClientBootstrap> clientBootstraps) {
+    this.context = Preconditions.checkNotNull(context);
+    this.conf = context.getConf();
+    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
+    this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>();
+    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
+    this.rand = new Random();
+
+    IOMode ioMode = IOMode.valueOf(conf.ioMode());
+    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
+    // TODO: Make thread pool name configurable.
+    this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
+    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
+      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
+  }
+
+  /**
+   * Create a {@link TransportClient} connecting to the given remote host / port.
+   *
+   * We maintains an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer)
+   * and randomly picks one to use. If no client was previously created in the randomly selected
+   * spot, this function creates a new client and places it there.
+   *
+   * Prior to the creation of a new TransportClient, we will execute all
+   * {@link TransportClientBootstrap}s that are registered with this factory.
+   *
+   * This blocks until a connection is successfully established and fully bootstrapped.
+   *
+   * Concurrency: This method is safe to call from multiple threads.
+   */
+  public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
+    // Get connection from the connection pool first.
+    // If it is not found or not active, create a new one.
+    final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
+
+    // Create the ClientPool if we don't have it yet.
+    ClientPool clientPool = connectionPool.get(address);
+    if (clientPool == null) {
+      connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
+      clientPool = connectionPool.get(address);
+    }
+
+    int clientIndex = rand.nextInt(numConnectionsPerPeer);
+    TransportClient cachedClient = clientPool.clients[clientIndex];
+
+    if (cachedClient != null && cachedClient.isActive()) {
+      // Make sure that the channel will not timeout by updating the last use time of the
+      // handler. Then check that the client is still alive, in case it timed out before
+      // this code was able to update things.
+      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
+        .get(TransportChannelHandler.class);
+      synchronized (handler) {
+        handler.getResponseHandler().updateTimeOfLastRequest();
+      }
+
+      if (cachedClient.isActive()) {
+        logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+        return cachedClient;
+      }
+    }
+
+    // If we reach here, we don't have an existing connection open. Let's create a new one.
+    // Multiple threads might race here to create new connections. Keep only one of them active.
+    synchronized (clientPool.locks[clientIndex]) {
+      cachedClient = clientPool.clients[clientIndex];
+
+      if (cachedClient != null) {
+        if (cachedClient.isActive()) {
+          logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+          return cachedClient;
+        } else {
+          logger.info("Found inactive connection to {}, creating a new one.", address);
+        }
+      }
+      clientPool.clients[clientIndex] = createClient(address);
+      return clientPool.clients[clientIndex];
+    }
+  }
+
+  /**
+   * Create a completely new {@link TransportClient} to the given remote host / port.
+   * This connection is not pooled.
+   *
+   * As with {@link #createClient(String, int)}, this method is blocking.
+   */
+  public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
+      throws IOException {
+    final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
+    return createClient(address);
+  }
+
+  /** Create a completely new {@link TransportClient} to the remote address. */
+  private TransportClient createClient(InetSocketAddress address) throws IOException {
+    logger.debug("Creating new connection to " + address);
+
+    Bootstrap bootstrap = new Bootstrap();
+    bootstrap.group(workerGroup)
+      .channel(socketChannelClass)
+      // Disable Nagle's Algorithm since we don't want packets to wait
+      .option(ChannelOption.TCP_NODELAY, true)
+      .option(ChannelOption.SO_KEEPALIVE, true)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
+      .option(ChannelOption.ALLOCATOR, pooledAllocator);
+
+    final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
+    final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
+
+    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+      @Override
+      public void initChannel(SocketChannel ch) {
+        TransportChannelHandler clientHandler = context.initializePipeline(ch);
+        clientRef.set(clientHandler.getClient());
+        channelRef.set(ch);
+      }
+    });
+
+    // Connect to the remote server
+    long preConnect = System.nanoTime();
+    ChannelFuture cf = bootstrap.connect(address);
+    if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
+      throw new IOException(
+        String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
+    } else if (cf.cause() != null) {
+      throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
+    }
+
+    TransportClient client = clientRef.get();
+    Channel channel = channelRef.get();
+    assert client != null : "Channel future completed successfully with null client";
+
+    // Execute any client bootstraps synchronously before marking the Client as successful.
+    long preBootstrap = System.nanoTime();
+    logger.debug("Connection to {} successful, running bootstraps...", address);
+    try {
+      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
+        clientBootstrap.doBootstrap(client, channel);
+      }
+    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
+      long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
+      logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
+      client.close();
+      throw Throwables.propagate(e);
+    }
+    long postBootstrap = System.nanoTime();
+
+    logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
+      address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
+
+    return client;
+  }
+
+  /** Close all connections in the connection pool, and shutdown the worker thread pool. */
+  @Override
+  public void close() {
+    // Go through all clients and close them if they are active.
+    for (ClientPool clientPool : connectionPool.values()) {
+      for (int i = 0; i < clientPool.clients.length; i++) {
+        TransportClient client = clientPool.clients[i];
+        if (client != null) {
+          clientPool.clients[i] = null;
+          JavaUtils.closeQuietly(client);
+        }
+      }
+    }
+    connectionPool.clear();
+
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+      workerGroup = null;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org