You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:30 UTC
[14/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules
into common/network-*
[SPARK-13529][BUILD] Move network/* modules into common/network-*
## What changes were proposed in this pull request?
As the title says, this moves the three modules currently in network/ into common/network-*. This removes one top level, non-user-facing folder.
## How was this patch tested?
Compilation and existing tests. We should run both SBT and Maven.
Author: Reynold Xin <rx...@databricks.com>
Closes #11409 from rxin/SPARK-13529.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e01dcc6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e01dcc6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e01dcc6
Branch: refs/heads/master
Commit: 9e01dcc6446f8648e61062f8afe62589b9d4b5ab
Parents: cca79fa
Author: Reynold Xin <rx...@databricks.com>
Authored: Sun Feb 28 17:25:07 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 28 17:25:07 2016 -0800
----------------------------------------------------------------------
common/network-common/pom.xml | 103 ++++
.../apache/spark/network/TransportContext.java | 166 +++++++
.../buffer/FileSegmentManagedBuffer.java | 154 ++++++
.../spark/network/buffer/LazyFileRegion.java | 111 +++++
.../spark/network/buffer/ManagedBuffer.java | 75 +++
.../network/buffer/NettyManagedBuffer.java | 76 +++
.../spark/network/buffer/NioManagedBuffer.java | 75 +++
.../client/ChunkFetchFailureException.java | 31 ++
.../network/client/ChunkReceivedCallback.java | 47 ++
.../network/client/RpcResponseCallback.java | 32 ++
.../spark/network/client/StreamCallback.java | 40 ++
.../spark/network/client/StreamInterceptor.java | 86 ++++
.../spark/network/client/TransportClient.java | 321 +++++++++++++
.../client/TransportClientBootstrap.java | 34 ++
.../network/client/TransportClientFactory.java | 264 ++++++++++
.../client/TransportResponseHandler.java | 251 ++++++++++
.../spark/network/protocol/AbstractMessage.java | 54 +++
.../protocol/AbstractResponseMessage.java | 32 ++
.../network/protocol/ChunkFetchFailure.java | 76 +++
.../network/protocol/ChunkFetchRequest.java | 71 +++
.../network/protocol/ChunkFetchSuccess.java | 89 ++++
.../spark/network/protocol/Encodable.java | 41 ++
.../apache/spark/network/protocol/Encoders.java | 92 ++++
.../apache/spark/network/protocol/Message.java | 73 +++
.../spark/network/protocol/MessageDecoder.java | 82 ++++
.../spark/network/protocol/MessageEncoder.java | 93 ++++
.../network/protocol/MessageWithHeader.java | 135 ++++++
.../spark/network/protocol/OneWayMessage.java | 80 ++++
.../spark/network/protocol/RequestMessage.java | 25 +
.../spark/network/protocol/ResponseMessage.java | 25 +
.../spark/network/protocol/RpcFailure.java | 74 +++
.../spark/network/protocol/RpcRequest.java | 87 ++++
.../spark/network/protocol/RpcResponse.java | 87 ++++
.../spark/network/protocol/StreamChunkId.java | 73 +++
.../spark/network/protocol/StreamFailure.java | 80 ++++
.../spark/network/protocol/StreamRequest.java | 78 +++
.../spark/network/protocol/StreamResponse.java | 92 ++++
.../spark/network/sasl/SaslClientBootstrap.java | 109 +++++
.../spark/network/sasl/SaslEncryption.java | 291 ++++++++++++
.../network/sasl/SaslEncryptionBackend.java | 33 ++
.../apache/spark/network/sasl/SaslMessage.java | 78 +++
.../spark/network/sasl/SaslRpcHandler.java | 158 ++++++
.../spark/network/sasl/SaslServerBootstrap.java | 49 ++
.../spark/network/sasl/SecretKeyHolder.java | 35 ++
.../spark/network/sasl/SparkSaslClient.java | 162 +++++++
.../spark/network/sasl/SparkSaslServer.java | 200 ++++++++
.../spark/network/server/MessageHandler.java | 39 ++
.../spark/network/server/NoOpRpcHandler.java | 40 ++
.../network/server/OneForOneStreamManager.java | 143 ++++++
.../apache/spark/network/server/RpcHandler.java | 100 ++++
.../spark/network/server/StreamManager.java | 86 ++++
.../network/server/TransportChannelHandler.java | 163 +++++++
.../network/server/TransportRequestHandler.java | 209 ++++++++
.../spark/network/server/TransportServer.java | 151 ++++++
.../server/TransportServerBootstrap.java | 36 ++
.../network/util/ByteArrayWritableChannel.java | 69 +++
.../org/apache/spark/network/util/ByteUnit.java | 67 +++
.../spark/network/util/ConfigProvider.java | 52 ++
.../org/apache/spark/network/util/IOMode.java | 27 ++
.../apache/spark/network/util/JavaUtils.java | 303 ++++++++++++
.../spark/network/util/LimitedInputStream.java | 105 ++++
.../spark/network/util/MapConfigProvider.java | 41 ++
.../apache/spark/network/util/NettyUtils.java | 139 ++++++
.../util/SystemPropertyConfigProvider.java | 34 ++
.../spark/network/util/TransportConf.java | 169 +++++++
.../network/util/TransportFrameDecoder.java | 227 +++++++++
.../network/ChunkFetchIntegrationSuite.java | 244 ++++++++++
.../org/apache/spark/network/ProtocolSuite.java | 127 +++++
.../network/RequestTimeoutIntegrationSuite.java | 288 +++++++++++
.../spark/network/RpcIntegrationSuite.java | 215 +++++++++
.../org/apache/spark/network/StreamSuite.java | 349 ++++++++++++++
.../apache/spark/network/TestManagedBuffer.java | 109 +++++
.../org/apache/spark/network/TestUtils.java | 30 ++
.../network/TransportClientFactorySuite.java | 214 +++++++++
.../network/TransportResponseHandlerSuite.java | 146 ++++++
.../protocol/MessageWithHeaderSuite.java | 157 ++++++
.../spark/network/sasl/SparkSaslSuite.java | 476 +++++++++++++++++++
.../server/OneForOneStreamManagerSuite.java | 50 ++
.../util/TransportFrameDecoderSuite.java | 258 ++++++++++
.../src/test/resources/log4j.properties | 27 ++
common/network-shuffle/pom.xml | 101 ++++
.../network/sasl/ShuffleSecretManager.java | 97 ++++
.../network/shuffle/BlockFetchingListener.java | 36 ++
.../shuffle/ExternalShuffleBlockHandler.java | 140 ++++++
.../shuffle/ExternalShuffleBlockResolver.java | 449 +++++++++++++++++
.../network/shuffle/ExternalShuffleClient.java | 154 ++++++
.../network/shuffle/OneForOneBlockFetcher.java | 129 +++++
.../network/shuffle/RetryingBlockFetcher.java | 234 +++++++++
.../spark/network/shuffle/ShuffleClient.java | 44 ++
.../mesos/MesosExternalShuffleClient.java | 73 +++
.../shuffle/protocol/BlockTransferMessage.java | 81 ++++
.../shuffle/protocol/ExecutorShuffleInfo.java | 94 ++++
.../network/shuffle/protocol/OpenBlocks.java | 90 ++++
.../shuffle/protocol/RegisterExecutor.java | 94 ++++
.../network/shuffle/protocol/StreamHandle.java | 81 ++++
.../network/shuffle/protocol/UploadBlock.java | 117 +++++
.../shuffle/protocol/mesos/RegisterDriver.java | 63 +++
.../network/sasl/SaslIntegrationSuite.java | 294 ++++++++++++
.../shuffle/BlockTransferMessagesSuite.java | 44 ++
.../ExternalShuffleBlockHandlerSuite.java | 127 +++++
.../ExternalShuffleBlockResolverSuite.java | 156 ++++++
.../shuffle/ExternalShuffleCleanupSuite.java | 149 ++++++
.../ExternalShuffleIntegrationSuite.java | 301 ++++++++++++
.../shuffle/ExternalShuffleSecuritySuite.java | 124 +++++
.../shuffle/OneForOneBlockFetcherSuite.java | 176 +++++++
.../shuffle/RetryingBlockFetcherSuite.java | 313 ++++++++++++
.../network/shuffle/TestShuffleDataContext.java | 117 +++++
common/network-yarn/pom.xml | 148 ++++++
.../spark/network/yarn/YarnShuffleService.java | 224 +++++++++
.../network/yarn/util/HadoopConfigProvider.java | 42 ++
dev/sparktestsupport/modules.py | 4 +-
docs/job-scheduling.md | 2 +-
.../spark/launcher/AbstractCommandBuilder.java | 3 +-
make-distribution.sh | 2 +-
network/common/pom.xml | 103 ----
.../apache/spark/network/TransportContext.java | 166 -------
.../buffer/FileSegmentManagedBuffer.java | 154 ------
.../spark/network/buffer/LazyFileRegion.java | 111 -----
.../spark/network/buffer/ManagedBuffer.java | 75 ---
.../network/buffer/NettyManagedBuffer.java | 76 ---
.../spark/network/buffer/NioManagedBuffer.java | 75 ---
.../client/ChunkFetchFailureException.java | 31 --
.../network/client/ChunkReceivedCallback.java | 47 --
.../network/client/RpcResponseCallback.java | 32 --
.../spark/network/client/StreamCallback.java | 40 --
.../spark/network/client/StreamInterceptor.java | 86 ----
.../spark/network/client/TransportClient.java | 321 -------------
.../client/TransportClientBootstrap.java | 34 --
.../network/client/TransportClientFactory.java | 264 ----------
.../client/TransportResponseHandler.java | 251 ----------
.../spark/network/protocol/AbstractMessage.java | 54 ---
.../protocol/AbstractResponseMessage.java | 32 --
.../network/protocol/ChunkFetchFailure.java | 76 ---
.../network/protocol/ChunkFetchRequest.java | 71 ---
.../network/protocol/ChunkFetchSuccess.java | 89 ----
.../spark/network/protocol/Encodable.java | 41 --
.../apache/spark/network/protocol/Encoders.java | 92 ----
.../apache/spark/network/protocol/Message.java | 73 ---
.../spark/network/protocol/MessageDecoder.java | 82 ----
.../spark/network/protocol/MessageEncoder.java | 93 ----
.../network/protocol/MessageWithHeader.java | 135 ------
.../spark/network/protocol/OneWayMessage.java | 80 ----
.../spark/network/protocol/RequestMessage.java | 25 -
.../spark/network/protocol/ResponseMessage.java | 25 -
.../spark/network/protocol/RpcFailure.java | 74 ---
.../spark/network/protocol/RpcRequest.java | 87 ----
.../spark/network/protocol/RpcResponse.java | 87 ----
.../spark/network/protocol/StreamChunkId.java | 73 ---
.../spark/network/protocol/StreamFailure.java | 80 ----
.../spark/network/protocol/StreamRequest.java | 78 ---
.../spark/network/protocol/StreamResponse.java | 92 ----
.../spark/network/sasl/SaslClientBootstrap.java | 109 -----
.../spark/network/sasl/SaslEncryption.java | 291 ------------
.../network/sasl/SaslEncryptionBackend.java | 33 --
.../apache/spark/network/sasl/SaslMessage.java | 78 ---
.../spark/network/sasl/SaslRpcHandler.java | 158 ------
.../spark/network/sasl/SaslServerBootstrap.java | 49 --
.../spark/network/sasl/SecretKeyHolder.java | 35 --
.../spark/network/sasl/SparkSaslClient.java | 162 -------
.../spark/network/sasl/SparkSaslServer.java | 200 --------
.../spark/network/server/MessageHandler.java | 39 --
.../spark/network/server/NoOpRpcHandler.java | 40 --
.../network/server/OneForOneStreamManager.java | 143 ------
.../apache/spark/network/server/RpcHandler.java | 100 ----
.../spark/network/server/StreamManager.java | 86 ----
.../network/server/TransportChannelHandler.java | 163 -------
.../network/server/TransportRequestHandler.java | 209 --------
.../spark/network/server/TransportServer.java | 151 ------
.../server/TransportServerBootstrap.java | 36 --
.../network/util/ByteArrayWritableChannel.java | 69 ---
.../org/apache/spark/network/util/ByteUnit.java | 67 ---
.../spark/network/util/ConfigProvider.java | 52 --
.../org/apache/spark/network/util/IOMode.java | 27 --
.../apache/spark/network/util/JavaUtils.java | 303 ------------
.../spark/network/util/LimitedInputStream.java | 105 ----
.../spark/network/util/MapConfigProvider.java | 41 --
.../apache/spark/network/util/NettyUtils.java | 139 ------
.../util/SystemPropertyConfigProvider.java | 34 --
.../spark/network/util/TransportConf.java | 169 -------
.../network/util/TransportFrameDecoder.java | 227 ---------
.../network/ChunkFetchIntegrationSuite.java | 244 ----------
.../org/apache/spark/network/ProtocolSuite.java | 127 -----
.../network/RequestTimeoutIntegrationSuite.java | 288 -----------
.../spark/network/RpcIntegrationSuite.java | 215 ---------
.../org/apache/spark/network/StreamSuite.java | 349 --------------
.../apache/spark/network/TestManagedBuffer.java | 109 -----
.../org/apache/spark/network/TestUtils.java | 30 --
.../network/TransportClientFactorySuite.java | 214 ---------
.../network/TransportResponseHandlerSuite.java | 146 ------
.../protocol/MessageWithHeaderSuite.java | 157 ------
.../spark/network/sasl/SparkSaslSuite.java | 476 -------------------
.../server/OneForOneStreamManagerSuite.java | 50 --
.../util/TransportFrameDecoderSuite.java | 258 ----------
.../common/src/test/resources/log4j.properties | 27 --
network/shuffle/pom.xml | 101 ----
.../network/sasl/ShuffleSecretManager.java | 97 ----
.../network/shuffle/BlockFetchingListener.java | 36 --
.../shuffle/ExternalShuffleBlockHandler.java | 140 ------
.../shuffle/ExternalShuffleBlockResolver.java | 449 -----------------
.../network/shuffle/ExternalShuffleClient.java | 154 ------
.../network/shuffle/OneForOneBlockFetcher.java | 129 -----
.../network/shuffle/RetryingBlockFetcher.java | 234 ---------
.../spark/network/shuffle/ShuffleClient.java | 44 --
.../mesos/MesosExternalShuffleClient.java | 73 ---
.../shuffle/protocol/BlockTransferMessage.java | 81 ----
.../shuffle/protocol/ExecutorShuffleInfo.java | 94 ----
.../network/shuffle/protocol/OpenBlocks.java | 90 ----
.../shuffle/protocol/RegisterExecutor.java | 94 ----
.../network/shuffle/protocol/StreamHandle.java | 81 ----
.../network/shuffle/protocol/UploadBlock.java | 117 -----
.../shuffle/protocol/mesos/RegisterDriver.java | 63 ---
.../network/sasl/SaslIntegrationSuite.java | 294 ------------
.../shuffle/BlockTransferMessagesSuite.java | 44 --
.../ExternalShuffleBlockHandlerSuite.java | 127 -----
.../ExternalShuffleBlockResolverSuite.java | 156 ------
.../shuffle/ExternalShuffleCleanupSuite.java | 149 ------
.../ExternalShuffleIntegrationSuite.java | 301 ------------
.../shuffle/ExternalShuffleSecuritySuite.java | 124 -----
.../shuffle/OneForOneBlockFetcherSuite.java | 176 -------
.../shuffle/RetryingBlockFetcherSuite.java | 313 ------------
.../network/shuffle/TestShuffleDataContext.java | 117 -----
network/yarn/pom.xml | 148 ------
.../spark/network/yarn/YarnShuffleService.java | 224 ---------
.../network/yarn/util/HadoopConfigProvider.java | 42 --
pom.xml | 6 +-
225 files changed, 13711 insertions(+), 13710 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
new file mode 100644
index 0000000..bd507c2
--- /dev/null
+++ b/common/network-common/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-network-common_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Networking</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>network-common</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <!-- Core dependencies -->
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+ <!-- Provided dependencies -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <!-- Create a test-jar so network-shuffle can depend on our test utilities. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-jar-on-test-compile</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
new file mode 100644
index 0000000..238710d
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.client.TransportResponseHandler;
+import org.apache.spark.network.protocol.MessageDecoder;
+import org.apache.spark.network.protocol.MessageEncoder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.server.TransportRequestHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
+ * setup Netty Channel pipelines with a {@link org.apache.spark.network.server.TransportChannelHandler}.
+ *
+ * There are two communication protocols that the TransportClient provides, control-plane RPCs and
+ * data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
+ * TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams
+ * which can be streamed through the data plane in chunks using zero-copy IO.
+ *
+ * The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
+ * channel. As each TransportChannelHandler contains a TransportClient, this enables server
+ * processes to send messages back to the client on an existing channel.
+ */
+public class TransportContext {
+ private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
+
+ private final TransportConf conf;
+ private final RpcHandler rpcHandler;
+ private final boolean closeIdleConnections;
+
+ private final MessageEncoder encoder;
+ private final MessageDecoder decoder;
+
+ public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
+ this(conf, rpcHandler, false);
+ }
+
+ public TransportContext(
+ TransportConf conf,
+ RpcHandler rpcHandler,
+ boolean closeIdleConnections) {
+ this.conf = conf;
+ this.rpcHandler = rpcHandler;
+ this.encoder = new MessageEncoder();
+ this.decoder = new MessageDecoder();
+ this.closeIdleConnections = closeIdleConnections;
+ }
+
+ /**
+ * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
+ * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
+ * to create a Client.
+ */
+ public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
+ return new TransportClientFactory(this, bootstraps);
+ }
+
+ public TransportClientFactory createClientFactory() {
+ return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
+ }
+
+ /** Create a server which will attempt to bind to a specific port. */
+ public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
+ return new TransportServer(this, null, port, rpcHandler, bootstraps);
+ }
+
+ /** Create a server which will attempt to bind to a specific host and port. */
+ public TransportServer createServer(
+ String host, int port, List<TransportServerBootstrap> bootstraps) {
+ return new TransportServer(this, host, port, rpcHandler, bootstraps);
+ }
+
+ /** Creates a new server, binding to any available ephemeral port. */
+ public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
+ return createServer(0, bootstraps);
+ }
+
+ public TransportServer createServer() {
+ return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
+ }
+
+ public TransportChannelHandler initializePipeline(SocketChannel channel) {
+ return initializePipeline(channel, rpcHandler);
+ }
+
+ /**
+ * Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
+ * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
+ * response messages.
+ *
+ * @param channel The channel to initialize.
+ * @param channelRpcHandler The RPC handler to use for the channel.
+ *
+ * @return Returns the created TransportChannelHandler, which includes a TransportClient that can
+ * be used to communicate on this channel. The TransportClient is directly associated with a
+ * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
+ */
+ public TransportChannelHandler initializePipeline(
+ SocketChannel channel,
+ RpcHandler channelRpcHandler) {
+ try {
+ TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
+ channel.pipeline()
+ .addLast("encoder", encoder)
+ .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
+ .addLast("decoder", decoder)
+ .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
+ // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
+ // would require more logic to guarantee if this were not part of the same event loop.
+ .addLast("handler", channelHandler);
+ return channelHandler;
+ } catch (RuntimeException e) {
+ logger.error("Error while initializing Netty pipeline", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Creates the server- and client-side handler which is used to handle both RequestMessages and
+ * ResponseMessages. The channel is expected to have been successfully created, though certain
+ * properties (such as the remoteAddress()) may not be available yet.
+ */
+ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
+ TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
+ TransportClient client = new TransportClient(channel, responseHandler);
+ TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
+ rpcHandler);
+ return new TransportChannelHandler(client, responseHandler, requestHandler,
+ conf.connectionTimeoutMs(), closeIdleConnections);
+ }
+
+ public TransportConf getConf() { return conf; }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
new file mode 100644
index 0000000..844eff4
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import io.netty.channel.DefaultFileRegion;
+
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A {@link ManagedBuffer} backed by a segment in a file.
+ */
+public final class FileSegmentManagedBuffer extends ManagedBuffer {
+ private final TransportConf conf;
+ private final File file;
+ private final long offset;
+ private final long length;
+
+ public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
+ this.conf = conf;
+ this.file = file;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public long size() {
+ return length;
+ }
+
+ @Override
+ public ByteBuffer nioByteBuffer() throws IOException {
+ FileChannel channel = null;
+ try {
+ channel = new RandomAccessFile(file, "r").getChannel();
+ // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
+ if (length < conf.memoryMapBytes()) {
+ ByteBuffer buf = ByteBuffer.allocate((int) length);
+ channel.position(offset);
+ while (buf.remaining() != 0) {
+ if (channel.read(buf) == -1) {
+ throw new IOException(String.format("Reached EOF before filling buffer\n" +
+ "offset=%s\nfile=%s\nbuf.remaining=%s",
+ offset, file.getAbsoluteFile(), buf.remaining()));
+ }
+ }
+ buf.flip();
+ return buf;
+ } else {
+ return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
+ }
+ } catch (IOException e) {
+ try {
+ if (channel != null) {
+ long size = channel.size();
+ throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
+ e);
+ }
+ } catch (IOException ignored) {
+ // ignore
+ }
+ throw new IOException("Error in opening " + this, e);
+ } finally {
+ JavaUtils.closeQuietly(channel);
+ }
+ }
+
+ @Override
+ public InputStream createInputStream() throws IOException {
+ FileInputStream is = null;
+ try {
+ is = new FileInputStream(file);
+ ByteStreams.skipFully(is, offset);
+ return new LimitedInputStream(is, length);
+ } catch (IOException e) {
+ try {
+ if (is != null) {
+ long size = file.length();
+ throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
+ e);
+ }
+ } catch (IOException ignored) {
+ // ignore
+ } finally {
+ JavaUtils.closeQuietly(is);
+ }
+ throw new IOException("Error in opening " + this, e);
+ } catch (RuntimeException e) {
+ JavaUtils.closeQuietly(is);
+ throw e;
+ }
+ }
+
+ @Override
+ public ManagedBuffer retain() {
+ return this;
+ }
+
+ @Override
+ public ManagedBuffer release() {
+ return this;
+ }
+
+ @Override
+ public Object convertToNetty() throws IOException {
+ if (conf.lazyFileDescriptor()) {
+ return new LazyFileRegion(file, offset, length);
+ } else {
+ FileChannel fileChannel = new FileInputStream(file).getChannel();
+ return new DefaultFileRegion(fileChannel, offset, length);
+ }
+ }
+
+ public File getFile() { return file; }
+
+ public long getOffset() { return offset; }
+
+ public long getLength() { return length; }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("file", file)
+ .add("offset", offset)
+ .add("length", length)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
new file mode 100644
index 0000000..162cf6d
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Objects;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A FileRegion implementation that only creates the file descriptor when the region is being
+ * transferred. This cannot be used with Epoll because there is no native support for it.
+ *
+ * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
+ * should push this into Netty so the native Epoll transport can support this feature.
+ */
+public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
+
+ private final File file;
+ private final long position;
+ private final long count;
+
+ private FileChannel channel;
+
+ private long numBytesTransferred = 0L;
+
+ /**
+ * @param file file to transfer.
+ * @param position start position for the transfer.
+ * @param count number of bytes to transfer starting from position.
+ */
+ public LazyFileRegion(File file, long position, long count) {
+ this.file = file;
+ this.position = position;
+ this.count = count;
+ }
+
+ @Override
+ protected void deallocate() {
+ JavaUtils.closeQuietly(channel);
+ }
+
+ @Override
+ public long position() {
+ return position;
+ }
+
+ @Override
+ public long transfered() {
+ return numBytesTransferred;
+ }
+
+ @Override
+ public long count() {
+ return count;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position) throws IOException {
+ if (channel == null) {
+ channel = new FileInputStream(file).getChannel();
+ }
+
+ long count = this.count - position;
+ if (count < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
+ }
+
+ if (count == 0) {
+ return 0L;
+ }
+
+ long written = channel.transferTo(this.position + position, count, target);
+ if (written > 0) {
+ numBytesTransferred += written;
+ }
+ return written;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("file", file)
+ .add("position", position)
+ .add("count", count)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
new file mode 100644
index 0000000..1861f8d
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provides an immutable view for data in the form of bytes. The implementation
+ * should specify how the data is provided:
+ *
+ * - {@link FileSegmentManagedBuffer}: data backed by part of a file
+ * - {@link NioManagedBuffer}: data backed by a NIO ByteBuffer
+ * - {@link NettyManagedBuffer}: data backed by a Netty ByteBuf
+ *
+ * The concrete buffer implementation might be managed outside the JVM garbage collector.
+ * For example, in the case of {@link NettyManagedBuffer}, the buffers are reference counted.
+ * In that case, if the buffer is going to be passed around to a different thread, retain/release
+ * should be called.
+ */
+public abstract class ManagedBuffer {
+
+ /** Number of bytes of the data. */
+ public abstract long size();
+
+ /**
+ * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
+ * returned ByteBuffer should not affect the content of this buffer.
+ */
+ // TODO: Deprecate this, usage may require expensive memory mapping or allocation.
+ public abstract ByteBuffer nioByteBuffer() throws IOException;
+
+ /**
+ * Exposes this buffer's data as an InputStream. The underlying implementation does not
+ * necessarily check for the length of bytes read, so the caller is responsible for making sure
+ * it does not go over the limit.
+ */
+ public abstract InputStream createInputStream() throws IOException;
+
+ /**
+ * Increment the reference count by one if applicable.
+ */
+ public abstract ManagedBuffer retain();
+
+ /**
+ * If applicable, decrement the reference count by one and deallocates the buffer if the
+ * reference count reaches zero.
+ */
+ public abstract ManagedBuffer release();
+
+ /**
+ * Convert the buffer into an Netty object, used to write the data out. The return value is either
+ * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}.
+ *
+ * If this method returns a ByteBuf, then that buffer's reference count will be incremented and
+ * the caller will be responsible for releasing this new reference.
+ */
+ public abstract Object convertToNetty() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
new file mode 100644
index 0000000..4c8802a
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+/**
+ * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
+ */
+public final class NettyManagedBuffer extends ManagedBuffer {
+ private final ByteBuf buf;
+
+ public NettyManagedBuffer(ByteBuf buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public long size() {
+ return buf.readableBytes();
+ }
+
+ @Override
+ public ByteBuffer nioByteBuffer() throws IOException {
+ return buf.nioBuffer();
+ }
+
+ @Override
+ public InputStream createInputStream() throws IOException {
+ return new ByteBufInputStream(buf);
+ }
+
+ @Override
+ public ManagedBuffer retain() {
+ buf.retain();
+ return this;
+ }
+
+ @Override
+ public ManagedBuffer release() {
+ buf.release();
+ return this;
+ }
+
+ @Override
+ public Object convertToNetty() throws IOException {
+ return buf.duplicate().retain();
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("buf", buf)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
new file mode 100644
index 0000000..631d767
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
+
+/**
+ * A {@link ManagedBuffer} backed by {@link ByteBuffer}.
+ */
+public class NioManagedBuffer extends ManagedBuffer {
+ private final ByteBuffer buf;
+
+ public NioManagedBuffer(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public long size() {
+ return buf.remaining();
+ }
+
+ @Override
+ public ByteBuffer nioByteBuffer() throws IOException {
+ return buf.duplicate();
+ }
+
+ @Override
+ public InputStream createInputStream() throws IOException {
+ return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));
+ }
+
+ @Override
+ public ManagedBuffer retain() {
+ return this;
+ }
+
+ @Override
+ public ManagedBuffer release() {
+ return this;
+ }
+
+ @Override
+ public Object convertToNetty() throws IOException {
+ return Unpooled.wrappedBuffer(buf);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("buf", buf)
+ .toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
new file mode 100644
index 0000000..1fbdcd6
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * General exception caused by a remote exception while fetching a chunk.
+ */
+public class ChunkFetchFailureException extends RuntimeException {
+ public ChunkFetchFailureException(String errorMsg, Throwable cause) {
+ super(errorMsg, cause);
+ }
+
+ public ChunkFetchFailureException(String errorMsg) {
+ super(errorMsg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
new file mode 100644
index 0000000..519e6cb
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * Callback for the result of a single chunk result. For a single stream, the callbacks are
+ * guaranteed to be called by the same thread in the same order as the requests for chunks were
+ * made.
+ *
+ * Note that if a general stream failure occurs, all outstanding chunk requests may be failed.
+ */
+public interface ChunkReceivedCallback {
+ /**
+ * Called upon receipt of a particular chunk.
+ *
+ * The given buffer will initially have a refcount of 1, but will be release()'d as soon as this
+ * call returns. You must therefore either retain() the buffer or copy its contents before
+ * returning.
+ */
+ void onSuccess(int chunkIndex, ManagedBuffer buffer);
+
+ /**
+ * Called upon failure to fetch a particular chunk. Note that this may actually be called due
+ * to failure to fetch a prior chunk in this stream.
+ *
+ * After receiving a failure, the stream may or may not be valid. The client should not assume
+ * that the server's side of the stream has been closed.
+ */
+ void onFailure(int chunkIndex, Throwable e);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
new file mode 100644
index 0000000..47e93f9
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Callback for the result of a single RPC. This will be invoked once with either success or
+ * failure.
+ */
+public interface RpcResponseCallback {
+ /** Successful serialized result from server. */
+ void onSuccess(ByteBuffer response);
+
+ /** Exception either propagated from server or raised on client side. */
+ void onFailure(Throwable e);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java
new file mode 100644
index 0000000..29e6a30
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Callback for streaming data. Stream data will be offered to the {@link #onData(String, ByteBuffer)}
+ * method as it arrives. Once all the stream data is received, {@link #onComplete(String)} will be
+ * called.
+ * <p>
+ * The network library guarantees that a single thread will call these methods at a time, but
+ * different call may be made by different threads.
+ */
+public interface StreamCallback {
+ /** Called upon receipt of stream data. */
+ void onData(String streamId, ByteBuffer buf) throws IOException;
+
+ /** Called when all data from the stream has been received. */
+ void onComplete(String streamId) throws IOException;
+
+ /** Called if there's an error reading data from the stream. */
+ void onFailure(String streamId, Throwable cause) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
new file mode 100644
index 0000000..88ba3cc
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * An interceptor that is registered with the frame decoder to feed stream data to a
+ * callback.
+ */
+class StreamInterceptor implements TransportFrameDecoder.Interceptor {
+
+ private final TransportResponseHandler handler;
+ private final String streamId;
+ private final long byteCount;
+ private final StreamCallback callback;
+
+ private volatile long bytesRead;
+
+ StreamInterceptor(
+ TransportResponseHandler handler,
+ String streamId,
+ long byteCount,
+ StreamCallback callback) {
+ this.handler = handler;
+ this.streamId = streamId;
+ this.byteCount = byteCount;
+ this.callback = callback;
+ this.bytesRead = 0;
+ }
+
+ @Override
+ public void exceptionCaught(Throwable cause) throws Exception {
+ handler.deactivateStream();
+ callback.onFailure(streamId, cause);
+ }
+
+ @Override
+ public void channelInactive() throws Exception {
+ handler.deactivateStream();
+ callback.onFailure(streamId, new ClosedChannelException());
+ }
+
+ @Override
+ public boolean handle(ByteBuf buf) throws Exception {
+ int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
+ ByteBuffer nioBuffer = buf.readSlice(toRead).nioBuffer();
+
+ int available = nioBuffer.remaining();
+ callback.onData(streamId, nioBuffer);
+ bytesRead += available;
+ if (bytesRead > byteCount) {
+ RuntimeException re = new IllegalStateException(String.format(
+ "Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
+ callback.onFailure(streamId, re);
+ handler.deactivateStream();
+ throw re;
+ } else if (bytesRead == byteCount) {
+ handler.deactivateStream();
+ callback.onComplete(streamId);
+ }
+
+ return bytesRead != byteCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
new file mode 100644
index 0000000..e15f096
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.protocol.ChunkFetchRequest;
+import org.apache.spark.network.protocol.OneWayMessage;
+import org.apache.spark.network.protocol.RpcRequest;
+import org.apache.spark.network.protocol.StreamChunkId;
+import org.apache.spark.network.protocol.StreamRequest;
+import org.apache.spark.network.util.NettyUtils;
+
+/**
+ * Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
+ * efficient transfer of a large amount of data, broken up into chunks with size ranging from
+ * hundreds of KB to a few MB.
+ *
+ * Note that while this client deals with the fetching of chunks from a stream (i.e., data plane),
+ * the actual setup of the streams is done outside the scope of the transport layer. The convenience
+ * method "sendRPC" is provided to enable control plane communication between the client and server
+ * to perform this setup.
+ *
+ * For example, a typical workflow might be:
+ * client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
+ * client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
+ * client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
+ * ...
+ * client.sendRPC(new CloseStream(100))
+ *
+ * Construct an instance of TransportClient using {@link TransportClientFactory}. A single
+ * TransportClient may be used for multiple streams, but any given stream must be restricted to a
+ * single client, in order to avoid out-of-order responses.
+ *
+ * NB: This class is used to make requests to the server, while {@link TransportResponseHandler} is
+ * responsible for handling responses from the server.
+ *
+ * Concurrency: thread safe and can be called from multiple threads.
+ */
+public class TransportClient implements Closeable {
+ private final Logger logger = LoggerFactory.getLogger(TransportClient.class);
+
+ private final Channel channel;
+ private final TransportResponseHandler handler;
+ @Nullable private String clientId;
+ private volatile boolean timedOut;
+
+ public TransportClient(Channel channel, TransportResponseHandler handler) {
+ this.channel = Preconditions.checkNotNull(channel);
+ this.handler = Preconditions.checkNotNull(handler);
+ this.timedOut = false;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public boolean isActive() {
+ return !timedOut && (channel.isOpen() || channel.isActive());
+ }
+
+ public SocketAddress getSocketAddress() {
+ return channel.remoteAddress();
+ }
+
+ /**
+ * Returns the ID used by the client to authenticate itself when authentication is enabled.
+ *
+ * @return The client ID, or null if authentication is disabled.
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ /**
+ * Sets the authenticated client ID. This is meant to be used by the authentication layer.
+ *
+ * Trying to set a different client ID after it's been set will result in an exception.
+ */
+ public void setClientId(String id) {
+ Preconditions.checkState(clientId == null, "Client ID has already been set.");
+ this.clientId = id;
+ }
+
+ /**
+ * Requests a single chunk from the remote side, from the pre-negotiated streamId.
+ *
+ * Chunk indices go from 0 onwards. It is valid to request the same chunk multiple times, though
+ * some streams may not support this.
+ *
+ * Multiple fetchChunk requests may be outstanding simultaneously, and the chunks are guaranteed
+ * to be returned in the same order that they were requested, assuming only a single
+ * TransportClient is used to fetch the chunks.
+ *
+ * @param streamId Identifier that refers to a stream in the remote StreamManager. This should
+ * be agreed upon by client and server beforehand.
+ * @param chunkIndex 0-based index of the chunk to fetch
+ * @param callback Callback invoked upon successful receipt of chunk, or upon any failure.
+ */
+ public void fetchChunk(
+ long streamId,
+ final int chunkIndex,
+ final ChunkReceivedCallback callback) {
+ final String serverAddr = NettyUtils.getRemoteAddress(channel);
+ final long startTime = System.currentTimeMillis();
+ logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
+
+ final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
+ handler.addFetchRequest(streamChunkId, callback);
+
+ channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
+ timeTaken);
+ } else {
+ String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
+ serverAddr, future.cause());
+ logger.error(errorMsg, future.cause());
+ handler.removeFetchRequest(streamChunkId);
+ channel.close();
+ try {
+ callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Request to stream the data with the given stream ID from the remote end.
+ *
+ * @param streamId The stream to fetch.
+ * @param callback Object to call with the stream data.
+ */
+ public void stream(final String streamId, final StreamCallback callback) {
+ final String serverAddr = NettyUtils.getRemoteAddress(channel);
+ final long startTime = System.currentTimeMillis();
+ logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
+
+ // Need to synchronize here so that the callback is added to the queue and the RPC is
+ // written to the socket atomically, so that callbacks are called in the right order
+ // when responses arrive.
+ synchronized (this) {
+ handler.addStreamCallback(callback);
+ channel.writeAndFlush(new StreamRequest(streamId)).addListener(
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
+ timeTaken);
+ } else {
+ String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
+ serverAddr, future.cause());
+ logger.error(errorMsg, future.cause());
+ channel.close();
+ try {
+ callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked
+ * with the server's response or upon any failure.
+ *
+ * @param message The message to send.
+ * @param callback Callback to handle the RPC's reply.
+ * @return The RPC's id.
+ */
+ public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
+ final String serverAddr = NettyUtils.getRemoteAddress(channel);
+ final long startTime = System.currentTimeMillis();
+ logger.trace("Sending RPC to {}", serverAddr);
+
+ final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+ handler.addRpcRequest(requestId, callback);
+
+ channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
+ } else {
+ String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
+ serverAddr, future.cause());
+ logger.error(errorMsg, future.cause());
+ handler.removeRpcRequest(requestId);
+ channel.close();
+ try {
+ callback.onFailure(new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ }
+ });
+
+ return requestId;
+ }
+
+ /**
+ * Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
+ * a specified timeout for a response.
+ */
+ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
+ final SettableFuture<ByteBuffer> result = SettableFuture.create();
+
+ sendRpc(message, new RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ result.set(response);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ result.setException(e);
+ }
+ });
+
+ try {
+ return result.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw Throwables.propagate(e.getCause());
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
+ * Sends an opaque message to the RpcHandler on the server-side. No reply is expected for the
+ * message, and no delivery guarantees are made.
+ *
+ * @param message The message to send.
+ */
+ public void send(ByteBuffer message) {
+ channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
+ }
+
+ /**
+ * Removes any state associated with the given RPC.
+ *
+ * @param requestId The RPC id returned by {@link #sendRpc(ByteBuffer, RpcResponseCallback)}.
+ */
+ public void removeRpcRequest(long requestId) {
+ handler.removeRpcRequest(requestId);
+ }
+
+ /** Mark this channel as having timed out. */
+ public void timeOut() {
+ this.timedOut = true;
+ }
+
+ @VisibleForTesting
+ public TransportResponseHandler getHandler() {
+ return handler;
+ }
+
+ @Override
+ public void close() {
+ // close is a local operation and should finish with milliseconds; timeout just to be safe
+ channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("remoteAdress", channel.remoteAddress())
+ .add("clientId", clientId)
+ .add("isActive", isActive())
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
new file mode 100644
index 0000000..eaae2ee
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import io.netty.channel.Channel;
+
+/**
+ * A bootstrap which is executed on a TransportClient before it is returned to the user.
+ * This enables an initial exchange of information (e.g., SASL authentication tokens) on a once-per-
+ * connection basis.
+ *
+ * Since connections (and TransportClients) are reused as much as possible, it is generally
+ * reasonable to perform an expensive bootstrapping operation, as they often share a lifespan with
+ * the JVM itself.
+ */
+public interface TransportClientBootstrap {
+ /** Performs the bootstrapping operation, throwing an exception on failure. */
+ void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
new file mode 100644
index 0000000..61bafc8
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.server.TransportChannelHandler;
+import org.apache.spark.network.util.IOMode;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Factory for creating {@link TransportClient}s by using createClient.
+ *
+ * The factory maintains a connection pool to other hosts and should return the same
+ * TransportClient for the same remote host. It also shares a single worker thread pool for
+ * all TransportClients.
+ *
+ * TransportClients will be reused whenever possible. Prior to completing the creation of a new
+ * TransportClient, all given {@link TransportClientBootstrap}s will be run.
+ */
+public class TransportClientFactory implements Closeable {
+
+ /** A simple data structure to track the pool of clients between two peer nodes. */
+ private static class ClientPool {
+ TransportClient[] clients;
+ Object[] locks;
+
+ public ClientPool(int size) {
+ clients = new TransportClient[size];
+ locks = new Object[size];
+ for (int i = 0; i < size; i++) {
+ locks[i] = new Object();
+ }
+ }
+ }
+
+ private final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class);
+
+ private final TransportContext context;
+ private final TransportConf conf;
+ private final List<TransportClientBootstrap> clientBootstraps;
+ private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
+
+ /** Random number generator for picking connections between peers. */
+ private final Random rand;
+ private final int numConnectionsPerPeer;
+
+ private final Class<? extends Channel> socketChannelClass;
+ private EventLoopGroup workerGroup;
+ private PooledByteBufAllocator pooledAllocator;
+
+ public TransportClientFactory(
+ TransportContext context,
+ List<TransportClientBootstrap> clientBootstraps) {
+ this.context = Preconditions.checkNotNull(context);
+ this.conf = context.getConf();
+ this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
+ this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>();
+ this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
+ this.rand = new Random();
+
+ IOMode ioMode = IOMode.valueOf(conf.ioMode());
+ this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
+ // TODO: Make thread pool name configurable.
+ this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
+ this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
+ }
+
+ /**
+ * Create a {@link TransportClient} connecting to the given remote host / port.
+ *
+ * We maintains an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer)
+ * and randomly picks one to use. If no client was previously created in the randomly selected
+ * spot, this function creates a new client and places it there.
+ *
+ * Prior to the creation of a new TransportClient, we will execute all
+ * {@link TransportClientBootstrap}s that are registered with this factory.
+ *
+ * This blocks until a connection is successfully established and fully bootstrapped.
+ *
+ * Concurrency: This method is safe to call from multiple threads.
+ */
+ public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
+ // Get connection from the connection pool first.
+ // If it is not found or not active, create a new one.
+ final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
+
+ // Create the ClientPool if we don't have it yet.
+ ClientPool clientPool = connectionPool.get(address);
+ if (clientPool == null) {
+ connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
+ clientPool = connectionPool.get(address);
+ }
+
+ int clientIndex = rand.nextInt(numConnectionsPerPeer);
+ TransportClient cachedClient = clientPool.clients[clientIndex];
+
+ if (cachedClient != null && cachedClient.isActive()) {
+ // Make sure that the channel will not timeout by updating the last use time of the
+ // handler. Then check that the client is still alive, in case it timed out before
+ // this code was able to update things.
+ TransportChannelHandler handler = cachedClient.getChannel().pipeline()
+ .get(TransportChannelHandler.class);
+ synchronized (handler) {
+ handler.getResponseHandler().updateTimeOfLastRequest();
+ }
+
+ if (cachedClient.isActive()) {
+ logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ return cachedClient;
+ }
+ }
+
+ // If we reach here, we don't have an existing connection open. Let's create a new one.
+ // Multiple threads might race here to create new connections. Keep only one of them active.
+ synchronized (clientPool.locks[clientIndex]) {
+ cachedClient = clientPool.clients[clientIndex];
+
+ if (cachedClient != null) {
+ if (cachedClient.isActive()) {
+ logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ return cachedClient;
+ } else {
+ logger.info("Found inactive connection to {}, creating a new one.", address);
+ }
+ }
+ clientPool.clients[clientIndex] = createClient(address);
+ return clientPool.clients[clientIndex];
+ }
+ }
+
+ /**
+ * Create a completely new {@link TransportClient} to the given remote host / port.
+ * This connection is not pooled.
+ *
+ * As with {@link #createClient(String, int)}, this method is blocking.
+ */
+ public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
+ throws IOException {
+ final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
+ return createClient(address);
+ }
+
+ /** Create a completely new {@link TransportClient} to the remote address. */
+ private TransportClient createClient(InetSocketAddress address) throws IOException {
+ logger.debug("Creating new connection to " + address);
+
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup)
+ .channel(socketChannelClass)
+ // Disable Nagle's Algorithm since we don't want packets to wait
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
+ .option(ChannelOption.ALLOCATOR, pooledAllocator);
+
+ final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
+ final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
+
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) {
+ TransportChannelHandler clientHandler = context.initializePipeline(ch);
+ clientRef.set(clientHandler.getClient());
+ channelRef.set(ch);
+ }
+ });
+
+ // Connect to the remote server
+ long preConnect = System.nanoTime();
+ ChannelFuture cf = bootstrap.connect(address);
+ if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
+ throw new IOException(
+ String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
+ } else if (cf.cause() != null) {
+ throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
+ }
+
+ TransportClient client = clientRef.get();
+ Channel channel = channelRef.get();
+ assert client != null : "Channel future completed successfully with null client";
+
+ // Execute any client bootstraps synchronously before marking the Client as successful.
+ long preBootstrap = System.nanoTime();
+ logger.debug("Connection to {} successful, running bootstraps...", address);
+ try {
+ for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
+ clientBootstrap.doBootstrap(client, channel);
+ }
+ } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
+ long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
+ logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
+ client.close();
+ throw Throwables.propagate(e);
+ }
+ long postBootstrap = System.nanoTime();
+
+ logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
+ address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
+
+ return client;
+ }
+
+ /** Close all connections in the connection pool, and shutdown the worker thread pool. */
+ @Override
+ public void close() {
+ // Go through all clients and close them if they are active.
+ for (ClientPool clientPool : connectionPool.values()) {
+ for (int i = 0; i < clientPool.clients.length; i++) {
+ TransportClient client = clientPool.clients[i];
+ if (client != null) {
+ clientPool.clients[i] = null;
+ JavaUtils.closeQuietly(client);
+ }
+ }
+ }
+ connectionPool.clear();
+
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ workerGroup = null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org