You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:23 UTC

[07/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules into common/network-*

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
deleted file mode 100644
index 238710d..0000000
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import io.netty.channel.Channel;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportClientBootstrap;
-import org.apache.spark.network.client.TransportClientFactory;
-import org.apache.spark.network.client.TransportResponseHandler;
-import org.apache.spark.network.protocol.MessageDecoder;
-import org.apache.spark.network.protocol.MessageEncoder;
-import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.TransportChannelHandler;
-import org.apache.spark.network.server.TransportRequestHandler;
-import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.TransportServerBootstrap;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportConf;
-import org.apache.spark.network.util.TransportFrameDecoder;
-
-/**
- * Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
- * setup Netty Channel pipelines with a {@link org.apache.spark.network.server.TransportChannelHandler}.
- *
- * There are two communication protocols that the TransportClient provides, control-plane RPCs and
- * data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
- * TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams
- * which can be streamed through the data plane in chunks using zero-copy IO.
- *
- * The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
- * channel. As each TransportChannelHandler contains a TransportClient, this enables server
- * processes to send messages back to the client on an existing channel.
- */
-public class TransportContext {
-  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
-
-  private final TransportConf conf;
-  private final RpcHandler rpcHandler;
-  private final boolean closeIdleConnections;
-
-  private final MessageEncoder encoder;
-  private final MessageDecoder decoder;
-
-  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
-    this(conf, rpcHandler, false);
-  }
-
-  public TransportContext(
-      TransportConf conf,
-      RpcHandler rpcHandler,
-      boolean closeIdleConnections) {
-    this.conf = conf;
-    this.rpcHandler = rpcHandler;
-    this.encoder = new MessageEncoder();
-    this.decoder = new MessageDecoder();
-    this.closeIdleConnections = closeIdleConnections;
-  }
-
-  /**
-   * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
-   * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
-   * to create a Client.
-   */
-  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
-    return new TransportClientFactory(this, bootstraps);
-  }
-
-  public TransportClientFactory createClientFactory() {
-    return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
-  }
-
-  /** Create a server which will attempt to bind to a specific port. */
-  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
-    return new TransportServer(this, null, port, rpcHandler, bootstraps);
-  }
-
-  /** Create a server which will attempt to bind to a specific host and port. */
-  public TransportServer createServer(
-      String host, int port, List<TransportServerBootstrap> bootstraps) {
-    return new TransportServer(this, host, port, rpcHandler, bootstraps);
-  }
-
-  /** Creates a new server, binding to any available ephemeral port. */
-  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
-    return createServer(0, bootstraps);
-  }
-
-  public TransportServer createServer() {
-    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
-  }
-
-  public TransportChannelHandler initializePipeline(SocketChannel channel) {
-    return initializePipeline(channel, rpcHandler);
-  }
-
-  /**
-   * Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
-   * has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
-   * response messages.
-   *
-   * @param channel The channel to initialize.
-   * @param channelRpcHandler The RPC handler to use for the channel.
-   *
-   * @return Returns the created TransportChannelHandler, which includes a TransportClient that can
-   * be used to communicate on this channel. The TransportClient is directly associated with a
-   * ChannelHandler to ensure all users of the same channel get the same TransportClient object.
-   */
-  public TransportChannelHandler initializePipeline(
-      SocketChannel channel,
-      RpcHandler channelRpcHandler) {
-    try {
-      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
-      channel.pipeline()
-        .addLast("encoder", encoder)
-        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
-        .addLast("decoder", decoder)
-        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
-        // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
-        // would require more logic to guarantee if this were not part of the same event loop.
-        .addLast("handler", channelHandler);
-      return channelHandler;
-    } catch (RuntimeException e) {
-      logger.error("Error while initializing Netty pipeline", e);
-      throw e;
-    }
-  }
-
-  /**
-   * Creates the server- and client-side handler which is used to handle both RequestMessages and
-   * ResponseMessages. The channel is expected to have been successfully created, though certain
-   * properties (such as the remoteAddress()) may not be available yet.
-   */
-  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
-    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
-    TransportClient client = new TransportClient(channel, responseHandler);
-    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
-      rpcHandler);
-    return new TransportChannelHandler(client, responseHandler, requestHandler,
-      conf.connectionTimeoutMs(), closeIdleConnections);
-  }
-
-  public TransportConf getConf() { return conf; }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
deleted file mode 100644
index 844eff4..0000000
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.buffer;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import com.google.common.base.Objects;
-import com.google.common.io.ByteStreams;
-import io.netty.channel.DefaultFileRegion;
-
-import org.apache.spark.network.util.JavaUtils;
-import org.apache.spark.network.util.LimitedInputStream;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * A {@link ManagedBuffer} backed by a segment in a file.
- */
-public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
-  private final File file;
-  private final long offset;
-  private final long length;
-
-  public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
-    this.conf = conf;
-    this.file = file;
-    this.offset = offset;
-    this.length = length;
-  }
-
-  @Override
-  public long size() {
-    return length;
-  }
-
-  @Override
-  public ByteBuffer nioByteBuffer() throws IOException {
-    FileChannel channel = null;
-    try {
-      channel = new RandomAccessFile(file, "r").getChannel();
-      // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
-      if (length < conf.memoryMapBytes()) {
-        ByteBuffer buf = ByteBuffer.allocate((int) length);
-        channel.position(offset);
-        while (buf.remaining() != 0) {
-          if (channel.read(buf) == -1) {
-            throw new IOException(String.format("Reached EOF before filling buffer\n" +
-              "offset=%s\nfile=%s\nbuf.remaining=%s",
-              offset, file.getAbsoluteFile(), buf.remaining()));
-          }
-        }
-        buf.flip();
-        return buf;
-      } else {
-        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
-      }
-    } catch (IOException e) {
-      try {
-        if (channel != null) {
-          long size = channel.size();
-          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
-            e);
-        }
-      } catch (IOException ignored) {
-        // ignore
-      }
-      throw new IOException("Error in opening " + this, e);
-    } finally {
-      JavaUtils.closeQuietly(channel);
-    }
-  }
-
-  @Override
-  public InputStream createInputStream() throws IOException {
-    FileInputStream is = null;
-    try {
-      is = new FileInputStream(file);
-      ByteStreams.skipFully(is, offset);
-      return new LimitedInputStream(is, length);
-    } catch (IOException e) {
-      try {
-        if (is != null) {
-          long size = file.length();
-          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
-              e);
-        }
-      } catch (IOException ignored) {
-        // ignore
-      } finally {
-        JavaUtils.closeQuietly(is);
-      }
-      throw new IOException("Error in opening " + this, e);
-    } catch (RuntimeException e) {
-      JavaUtils.closeQuietly(is);
-      throw e;
-    }
-  }
-
-  @Override
-  public ManagedBuffer retain() {
-    return this;
-  }
-
-  @Override
-  public ManagedBuffer release() {
-    return this;
-  }
-
-  @Override
-  public Object convertToNetty() throws IOException {
-    if (conf.lazyFileDescriptor()) {
-      return new LazyFileRegion(file, offset, length);
-    } else {
-      FileChannel fileChannel = new FileInputStream(file).getChannel();
-      return new DefaultFileRegion(fileChannel, offset, length);
-    }
-  }
-
-  public File getFile() { return file; }
-
-  public long getOffset() { return offset; }
-
-  public long getLength() { return length; }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("file", file)
-      .add("offset", offset)
-      .add("length", length)
-      .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
deleted file mode 100644
index 162cf6d..0000000
--- a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.buffer;
-
-import java.io.FileInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
-
-import com.google.common.base.Objects;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import org.apache.spark.network.util.JavaUtils;
-
-/**
- * A FileRegion implementation that only creates the file descriptor when the region is being
- * transferred. This cannot be used with Epoll because there is no native support for it.
- *
- * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
- * should push this into Netty so the native Epoll transport can support this feature.
- */
-public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
-
-  private final File file;
-  private final long position;
-  private final long count;
-
-  private FileChannel channel;
-
-  private long numBytesTransferred = 0L;
-
-  /**
-   * @param file file to transfer.
-   * @param position start position for the transfer.
-   * @param count number of bytes to transfer starting from position.
-   */
-  public LazyFileRegion(File file, long position, long count) {
-    this.file = file;
-    this.position = position;
-    this.count = count;
-  }
-
-  @Override
-  protected void deallocate() {
-    JavaUtils.closeQuietly(channel);
-  }
-
-  @Override
-  public long position() {
-    return position;
-  }
-
-  @Override
-  public long transfered() {
-    return numBytesTransferred;
-  }
-
-  @Override
-  public long count() {
-    return count;
-  }
-
-  @Override
-  public long transferTo(WritableByteChannel target, long position) throws IOException {
-    if (channel == null) {
-      channel = new FileInputStream(file).getChannel();
-    }
-
-    long count = this.count - position;
-    if (count < 0 || position < 0) {
-      throw new IllegalArgumentException(
-          "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
-    }
-
-    if (count == 0) {
-      return 0L;
-    }
-
-    long written = channel.transferTo(this.position + position, count, target);
-    if (written > 0) {
-      numBytesTransferred += written;
-    }
-    return written;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-        .add("file", file)
-        .add("position", position)
-        .add("count", count)
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
deleted file mode 100644
index 1861f8d..0000000
--- a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.buffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- * This interface provides an immutable view for data in the form of bytes. The implementation
- * should specify how the data is provided:
- *
- * - {@link FileSegmentManagedBuffer}: data backed by part of a file
- * - {@link NioManagedBuffer}: data backed by a NIO ByteBuffer
- * - {@link NettyManagedBuffer}: data backed by a Netty ByteBuf
- *
- * The concrete buffer implementation might be managed outside the JVM garbage collector.
- * For example, in the case of {@link NettyManagedBuffer}, the buffers are reference counted.
- * In that case, if the buffer is going to be passed around to a different thread, retain/release
- * should be called.
- */
-public abstract class ManagedBuffer {
-
-  /** Number of bytes of the data. */
-  public abstract long size();
-
-  /**
-   * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
-   * returned ByteBuffer should not affect the content of this buffer.
-   */
-  // TODO: Deprecate this, usage may require expensive memory mapping or allocation.
-  public abstract ByteBuffer nioByteBuffer() throws IOException;
-
-  /**
-   * Exposes this buffer's data as an InputStream. The underlying implementation does not
-   * necessarily check for the length of bytes read, so the caller is responsible for making sure
-   * it does not go over the limit.
-   */
-  public abstract InputStream createInputStream() throws IOException;
-
-  /**
-   * Increment the reference count by one if applicable.
-   */
-  public abstract ManagedBuffer retain();
-
-  /**
-   * If applicable, decrement the reference count by one and deallocates the buffer if the
-   * reference count reaches zero.
-   */
-  public abstract ManagedBuffer release();
-
-  /**
-   * Convert the buffer into an Netty object, used to write the data out. The return value is either
-   * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}.
-   *
-   * If this method returns a ByteBuf, then that buffer's reference count will be incremented and
-   * the caller will be responsible for releasing this new reference.
-   */
-  public abstract Object convertToNetty() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
deleted file mode 100644
index 4c8802a..0000000
--- a/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.buffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-
-/**
- * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
- */
-public final class NettyManagedBuffer extends ManagedBuffer {
-  private final ByteBuf buf;
-
-  public NettyManagedBuffer(ByteBuf buf) {
-    this.buf = buf;
-  }
-
-  @Override
-  public long size() {
-    return buf.readableBytes();
-  }
-
-  @Override
-  public ByteBuffer nioByteBuffer() throws IOException {
-    return buf.nioBuffer();
-  }
-
-  @Override
-  public InputStream createInputStream() throws IOException {
-    return new ByteBufInputStream(buf);
-  }
-
-  @Override
-  public ManagedBuffer retain() {
-    buf.retain();
-    return this;
-  }
-
-  @Override
-  public ManagedBuffer release() {
-    buf.release();
-    return this;
-  }
-
-  @Override
-  public Object convertToNetty() throws IOException {
-    return buf.duplicate().retain();
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("buf", buf)
-      .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
deleted file mode 100644
index 631d767..0000000
--- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.buffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.Unpooled;
-
-/**
- * A {@link ManagedBuffer} backed by {@link ByteBuffer}.
- */
-public class NioManagedBuffer extends ManagedBuffer {
-  private final ByteBuffer buf;
-
-  public NioManagedBuffer(ByteBuffer buf) {
-    this.buf = buf;
-  }
-
-  @Override
-  public long size() {
-    return buf.remaining();
-  }
-
-  @Override
-  public ByteBuffer nioByteBuffer() throws IOException {
-    return buf.duplicate();
-  }
-
-  @Override
-  public InputStream createInputStream() throws IOException {
-    return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));
-  }
-
-  @Override
-  public ManagedBuffer retain() {
-    return this;
-  }
-
-  @Override
-  public ManagedBuffer release() {
-    return this;
-  }
-
-  @Override
-  public Object convertToNetty() throws IOException {
-    return Unpooled.wrappedBuffer(buf);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("buf", buf)
-      .toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java b/network/common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
deleted file mode 100644
index 1fbdcd6..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/ChunkFetchFailureException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-/**
- * General exception caused by a remote exception while fetching a chunk.
- */
-public class ChunkFetchFailureException extends RuntimeException {
-  public ChunkFetchFailureException(String errorMsg, Throwable cause) {
-    super(errorMsg, cause);
-  }
-
-  public ChunkFetchFailureException(String errorMsg) {
-    super(errorMsg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java b/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
deleted file mode 100644
index 519e6cb..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/ChunkReceivedCallback.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-
-/**
- * Callback for the result of a single chunk result. For a single stream, the callbacks are
- * guaranteed to be called by the same thread in the same order as the requests for chunks were
- * made.
- *
- * Note that if a general stream failure occurs, all outstanding chunk requests may be failed.
- */
-public interface ChunkReceivedCallback {
-  /**
-   * Called upon receipt of a particular chunk.
-   *
-   * The given buffer will initially have a refcount of 1, but will be release()'d as soon as this
-   * call returns. You must therefore either retain() the buffer or copy its contents before
-   * returning.
-   */
-  void onSuccess(int chunkIndex, ManagedBuffer buffer);
-
-  /**
-   * Called upon failure to fetch a particular chunk. Note that this may actually be called due
-   * to failure to fetch a prior chunk in this stream.
-   *
-   * After receiving a failure, the stream may or may not be valid. The client should not assume
-   * that the server's side of the stream has been closed.
-   */
-  void onFailure(int chunkIndex, Throwable e);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
deleted file mode 100644
index 47e93f9..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import java.nio.ByteBuffer;
-
-/**
- * Callback for the result of a single RPC. This will be invoked once with either success or
- * failure.
- */
-public interface RpcResponseCallback {
-  /** Successful serialized result from server. */
-  void onSuccess(ByteBuffer response);
-
-  /** Exception either propagated from server or raised on client side. */
-  void onFailure(Throwable e);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java b/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
deleted file mode 100644
index 29e6a30..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/StreamCallback.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * Callback for streaming data. Stream data will be offered to the {@link #onData(String, ByteBuffer)}
- * method as it arrives. Once all the stream data is received, {@link #onComplete(String)} will be
- * called.
- * <p>
- * The network library guarantees that a single thread will call these methods at a time, but
- * different call may be made by different threads.
- */
-public interface StreamCallback {
-  /** Called upon receipt of stream data. */
-  void onData(String streamId, ByteBuffer buf) throws IOException;
-
-  /** Called when all data from the stream has been received. */
-  void onComplete(String streamId) throws IOException;
-
-  /** Called if there's an error reading data from the stream. */
-  void onFailure(String streamId, Throwable cause) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
deleted file mode 100644
index 88ba3cc..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.util.TransportFrameDecoder;
-
-/**
- * An interceptor that is registered with the frame decoder to feed stream data to a
- * callback.
- */
-class StreamInterceptor implements TransportFrameDecoder.Interceptor {
-
-  private final TransportResponseHandler handler;
-  private final String streamId;
-  private final long byteCount;
-  private final StreamCallback callback;
-
-  private volatile long bytesRead;
-
-  StreamInterceptor(
-      TransportResponseHandler handler,
-      String streamId,
-      long byteCount,
-      StreamCallback callback) {
-    this.handler = handler;
-    this.streamId = streamId;
-    this.byteCount = byteCount;
-    this.callback = callback;
-    this.bytesRead = 0;
-  }
-
-  @Override
-  public void exceptionCaught(Throwable cause) throws Exception {
-    handler.deactivateStream();
-    callback.onFailure(streamId, cause);
-  }
-
-  @Override
-  public void channelInactive() throws Exception {
-    handler.deactivateStream();
-    callback.onFailure(streamId, new ClosedChannelException());
-  }
-
-  @Override
-  public boolean handle(ByteBuf buf) throws Exception {
-    int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
-    ByteBuffer nioBuffer = buf.readSlice(toRead).nioBuffer();
-
-    int available = nioBuffer.remaining();
-    callback.onData(streamId, nioBuffer);
-    bytesRead += available;
-    if (bytesRead > byteCount) {
-      RuntimeException re = new IllegalStateException(String.format(
-        "Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
-      callback.onFailure(streamId, re);
-      handler.deactivateStream();
-      throw re;
-    } else if (bytesRead == byteCount) {
-      handler.deactivateStream();
-      callback.onComplete(streamId);
-    }
-
-    return bytesRead != byteCount;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
deleted file mode 100644
index e15f096..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.SettableFuture;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.protocol.ChunkFetchRequest;
-import org.apache.spark.network.protocol.OneWayMessage;
-import org.apache.spark.network.protocol.RpcRequest;
-import org.apache.spark.network.protocol.StreamChunkId;
-import org.apache.spark.network.protocol.StreamRequest;
-import org.apache.spark.network.util.NettyUtils;
-
-/**
- * Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
- * efficient transfer of a large amount of data, broken up into chunks with size ranging from
- * hundreds of KB to a few MB.
- *
- * Note that while this client deals with the fetching of chunks from a stream (i.e., data plane),
- * the actual setup of the streams is done outside the scope of the transport layer. The convenience
- * method "sendRPC" is provided to enable control plane communication between the client and server
- * to perform this setup.
- *
- * For example, a typical workflow might be:
- * client.sendRPC(new OpenFile("/foo")) --&gt; returns StreamId = 100
- * client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
- * client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
- * ...
- * client.sendRPC(new CloseStream(100))
- *
- * Construct an instance of TransportClient using {@link TransportClientFactory}. A single
- * TransportClient may be used for multiple streams, but any given stream must be restricted to a
- * single client, in order to avoid out-of-order responses.
- *
- * NB: This class is used to make requests to the server, while {@link TransportResponseHandler} is
- * responsible for handling responses from the server.
- *
- * Concurrency: thread safe and can be called from multiple threads.
- */
-public class TransportClient implements Closeable {
-  private final Logger logger = LoggerFactory.getLogger(TransportClient.class);
-
-  private final Channel channel;
-  private final TransportResponseHandler handler;
-  @Nullable private String clientId;
-  private volatile boolean timedOut;
-
-  public TransportClient(Channel channel, TransportResponseHandler handler) {
-    this.channel = Preconditions.checkNotNull(channel);
-    this.handler = Preconditions.checkNotNull(handler);
-    this.timedOut = false;
-  }
-
-  public Channel getChannel() {
-    return channel;
-  }
-
-  public boolean isActive() {
-    return !timedOut && (channel.isOpen() || channel.isActive());
-  }
-
-  public SocketAddress getSocketAddress() {
-    return channel.remoteAddress();
-  }
-
-  /**
-   * Returns the ID used by the client to authenticate itself when authentication is enabled.
-   *
-   * @return The client ID, or null if authentication is disabled.
-   */
-  public String getClientId() {
-    return clientId;
-  }
-
-  /**
-   * Sets the authenticated client ID. This is meant to be used by the authentication layer.
-   *
-   * Trying to set a different client ID after it's been set will result in an exception.
-   */
-  public void setClientId(String id) {
-    Preconditions.checkState(clientId == null, "Client ID has already been set.");
-    this.clientId = id;
-  }
-
-  /**
-   * Requests a single chunk from the remote side, from the pre-negotiated streamId.
-   *
-   * Chunk indices go from 0 onwards. It is valid to request the same chunk multiple times, though
-   * some streams may not support this.
-   *
-   * Multiple fetchChunk requests may be outstanding simultaneously, and the chunks are guaranteed
-   * to be returned in the same order that they were requested, assuming only a single
-   * TransportClient is used to fetch the chunks.
-   *
-   * @param streamId Identifier that refers to a stream in the remote StreamManager. This should
-   *                 be agreed upon by client and server beforehand.
-   * @param chunkIndex 0-based index of the chunk to fetch
-   * @param callback Callback invoked upon successful receipt of chunk, or upon any failure.
-   */
-  public void fetchChunk(
-      long streamId,
-      final int chunkIndex,
-      final ChunkReceivedCallback callback) {
-    final String serverAddr = NettyUtils.getRemoteAddress(channel);
-    final long startTime = System.currentTimeMillis();
-    logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
-
-    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
-    handler.addFetchRequest(streamChunkId, callback);
-
-    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
-      new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (future.isSuccess()) {
-            long timeTaken = System.currentTimeMillis() - startTime;
-            logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
-              timeTaken);
-          } else {
-            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
-              serverAddr, future.cause());
-            logger.error(errorMsg, future.cause());
-            handler.removeFetchRequest(streamChunkId);
-            channel.close();
-            try {
-              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
-            } catch (Exception e) {
-              logger.error("Uncaught exception in RPC response callback handler!", e);
-            }
-          }
-        }
-      });
-  }
-
-  /**
-   * Request to stream the data with the given stream ID from the remote end.
-   *
-   * @param streamId The stream to fetch.
-   * @param callback Object to call with the stream data.
-   */
-  public void stream(final String streamId, final StreamCallback callback) {
-    final String serverAddr = NettyUtils.getRemoteAddress(channel);
-    final long startTime = System.currentTimeMillis();
-    logger.debug("Sending stream request for {} to {}", streamId, serverAddr);
-
-    // Need to synchronize here so that the callback is added to the queue and the RPC is
-    // written to the socket atomically, so that callbacks are called in the right order
-    // when responses arrive.
-    synchronized (this) {
-      handler.addStreamCallback(callback);
-      channel.writeAndFlush(new StreamRequest(streamId)).addListener(
-        new ChannelFutureListener() {
-          @Override
-          public void operationComplete(ChannelFuture future) throws Exception {
-            if (future.isSuccess()) {
-              long timeTaken = System.currentTimeMillis() - startTime;
-              logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr,
-                timeTaken);
-            } else {
-              String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
-                serverAddr, future.cause());
-              logger.error(errorMsg, future.cause());
-              channel.close();
-              try {
-                callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
-              } catch (Exception e) {
-                logger.error("Uncaught exception in RPC response callback handler!", e);
-              }
-            }
-          }
-        });
-    }
-  }
-
-  /**
-   * Sends an opaque message to the RpcHandler on the server-side. The callback will be invoked
-   * with the server's response or upon any failure.
-   *
-   * @param message The message to send.
-   * @param callback Callback to handle the RPC's reply.
-   * @return The RPC's id.
-   */
-  public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
-    final String serverAddr = NettyUtils.getRemoteAddress(channel);
-    final long startTime = System.currentTimeMillis();
-    logger.trace("Sending RPC to {}", serverAddr);
-
-    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
-    handler.addRpcRequest(requestId, callback);
-
-    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
-      new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (future.isSuccess()) {
-            long timeTaken = System.currentTimeMillis() - startTime;
-            logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken);
-          } else {
-            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
-              serverAddr, future.cause());
-            logger.error(errorMsg, future.cause());
-            handler.removeRpcRequest(requestId);
-            channel.close();
-            try {
-              callback.onFailure(new IOException(errorMsg, future.cause()));
-            } catch (Exception e) {
-              logger.error("Uncaught exception in RPC response callback handler!", e);
-            }
-          }
-        }
-      });
-
-    return requestId;
-  }
-
-  /**
-   * Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
-   * a specified timeout for a response.
-   */
-  public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
-    final SettableFuture<ByteBuffer> result = SettableFuture.create();
-
-    sendRpc(message, new RpcResponseCallback() {
-      @Override
-      public void onSuccess(ByteBuffer response) {
-        result.set(response);
-      }
-
-      @Override
-      public void onFailure(Throwable e) {
-        result.setException(e);
-      }
-    });
-
-    try {
-      return result.get(timeoutMs, TimeUnit.MILLISECONDS);
-    } catch (ExecutionException e) {
-      throw Throwables.propagate(e.getCause());
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
-   * Sends an opaque message to the RpcHandler on the server-side. No reply is expected for the
-   * message, and no delivery guarantees are made.
-   *
-   * @param message The message to send.
-   */
-  public void send(ByteBuffer message) {
-    channel.writeAndFlush(new OneWayMessage(new NioManagedBuffer(message)));
-  }
-
-  /**
-   * Removes any state associated with the given RPC.
-   *
-   * @param requestId The RPC id returned by {@link #sendRpc(ByteBuffer, RpcResponseCallback)}.
-   */
-  public void removeRpcRequest(long requestId) {
-    handler.removeRpcRequest(requestId);
-  }
-
-  /** Mark this channel as having timed out. */
-  public void timeOut() {
-    this.timedOut = true;
-  }
-
-  @VisibleForTesting
-  public TransportResponseHandler getHandler() {
-    return handler;
-  }
-
-  @Override
-  public void close() {
-    // close is a local operation and should finish with milliseconds; timeout just to be safe
-    channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("remoteAdress", channel.remoteAddress())
-      .add("clientId", clientId)
-      .add("isActive", isActive())
-      .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
deleted file mode 100644
index eaae2ee..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientBootstrap.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import io.netty.channel.Channel;
-
-/**
- * A bootstrap which is executed on a TransportClient before it is returned to the user.
- * This enables an initial exchange of information (e.g., SASL authentication tokens) on a once-per-
- * connection basis.
- *
- * Since connections (and TransportClients) are reused as much as possible, it is generally
- * reasonable to perform an expensive bootstrapping operation, as they often share a lifespan with
- * the JVM itself.
- */
-public interface TransportClientBootstrap {
-  /** Performs the bootstrapping operation, throwing an exception on failure. */
-  void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
deleted file mode 100644
index 61bafc8..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.server.TransportChannelHandler;
-import org.apache.spark.network.util.IOMode;
-import org.apache.spark.network.util.JavaUtils;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * Factory for creating {@link TransportClient}s by using createClient.
- *
- * The factory maintains a connection pool to other hosts and should return the same
- * TransportClient for the same remote host. It also shares a single worker thread pool for
- * all TransportClients.
- *
- * TransportClients will be reused whenever possible. Prior to completing the creation of a new
- * TransportClient, all given {@link TransportClientBootstrap}s will be run.
- */
-public class TransportClientFactory implements Closeable {
-
-  /** A simple data structure to track the pool of clients between two peer nodes. */
-  private static class ClientPool {
-    TransportClient[] clients;
-    Object[] locks;
-
-    public ClientPool(int size) {
-      clients = new TransportClient[size];
-      locks = new Object[size];
-      for (int i = 0; i < size; i++) {
-        locks[i] = new Object();
-      }
-    }
-  }
-
-  private final Logger logger = LoggerFactory.getLogger(TransportClientFactory.class);
-
-  private final TransportContext context;
-  private final TransportConf conf;
-  private final List<TransportClientBootstrap> clientBootstraps;
-  private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
-
-  /** Random number generator for picking connections between peers. */
-  private final Random rand;
-  private final int numConnectionsPerPeer;
-
-  private final Class<? extends Channel> socketChannelClass;
-  private EventLoopGroup workerGroup;
-  private PooledByteBufAllocator pooledAllocator;
-
-  public TransportClientFactory(
-      TransportContext context,
-      List<TransportClientBootstrap> clientBootstraps) {
-    this.context = Preconditions.checkNotNull(context);
-    this.conf = context.getConf();
-    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
-    this.connectionPool = new ConcurrentHashMap<SocketAddress, ClientPool>();
-    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
-    this.rand = new Random();
-
-    IOMode ioMode = IOMode.valueOf(conf.ioMode());
-    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
-    // TODO: Make thread pool name configurable.
-    this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
-    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
-      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
-  }
-
-  /**
-   * Create a {@link TransportClient} connecting to the given remote host / port.
-   *
-   * We maintains an array of clients (size determined by spark.shuffle.io.numConnectionsPerPeer)
-   * and randomly picks one to use. If no client was previously created in the randomly selected
-   * spot, this function creates a new client and places it there.
-   *
-   * Prior to the creation of a new TransportClient, we will execute all
-   * {@link TransportClientBootstrap}s that are registered with this factory.
-   *
-   * This blocks until a connection is successfully established and fully bootstrapped.
-   *
-   * Concurrency: This method is safe to call from multiple threads.
-   */
-  public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
-    // Get connection from the connection pool first.
-    // If it is not found or not active, create a new one.
-    final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
-
-    // Create the ClientPool if we don't have it yet.
-    ClientPool clientPool = connectionPool.get(address);
-    if (clientPool == null) {
-      connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
-      clientPool = connectionPool.get(address);
-    }
-
-    int clientIndex = rand.nextInt(numConnectionsPerPeer);
-    TransportClient cachedClient = clientPool.clients[clientIndex];
-
-    if (cachedClient != null && cachedClient.isActive()) {
-      // Make sure that the channel will not timeout by updating the last use time of the
-      // handler. Then check that the client is still alive, in case it timed out before
-      // this code was able to update things.
-      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
-        .get(TransportChannelHandler.class);
-      synchronized (handler) {
-        handler.getResponseHandler().updateTimeOfLastRequest();
-      }
-
-      if (cachedClient.isActive()) {
-        logger.trace("Returning cached connection to {}: {}", address, cachedClient);
-        return cachedClient;
-      }
-    }
-
-    // If we reach here, we don't have an existing connection open. Let's create a new one.
-    // Multiple threads might race here to create new connections. Keep only one of them active.
-    synchronized (clientPool.locks[clientIndex]) {
-      cachedClient = clientPool.clients[clientIndex];
-
-      if (cachedClient != null) {
-        if (cachedClient.isActive()) {
-          logger.trace("Returning cached connection to {}: {}", address, cachedClient);
-          return cachedClient;
-        } else {
-          logger.info("Found inactive connection to {}, creating a new one.", address);
-        }
-      }
-      clientPool.clients[clientIndex] = createClient(address);
-      return clientPool.clients[clientIndex];
-    }
-  }
-
-  /**
-   * Create a completely new {@link TransportClient} to the given remote host / port.
-   * This connection is not pooled.
-   *
-   * As with {@link #createClient(String, int)}, this method is blocking.
-   */
-  public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
-      throws IOException {
-    final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
-    return createClient(address);
-  }
-
-  /** Create a completely new {@link TransportClient} to the remote address. */
-  private TransportClient createClient(InetSocketAddress address) throws IOException {
-    logger.debug("Creating new connection to " + address);
-
-    Bootstrap bootstrap = new Bootstrap();
-    bootstrap.group(workerGroup)
-      .channel(socketChannelClass)
-      // Disable Nagle's Algorithm since we don't want packets to wait
-      .option(ChannelOption.TCP_NODELAY, true)
-      .option(ChannelOption.SO_KEEPALIVE, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
-      .option(ChannelOption.ALLOCATOR, pooledAllocator);
-
-    final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
-    final AtomicReference<Channel> channelRef = new AtomicReference<Channel>();
-
-    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-      @Override
-      public void initChannel(SocketChannel ch) {
-        TransportChannelHandler clientHandler = context.initializePipeline(ch);
-        clientRef.set(clientHandler.getClient());
-        channelRef.set(ch);
-      }
-    });
-
-    // Connect to the remote server
-    long preConnect = System.nanoTime();
-    ChannelFuture cf = bootstrap.connect(address);
-    if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
-      throw new IOException(
-        String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
-    } else if (cf.cause() != null) {
-      throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
-    }
-
-    TransportClient client = clientRef.get();
-    Channel channel = channelRef.get();
-    assert client != null : "Channel future completed successfully with null client";
-
-    // Execute any client bootstraps synchronously before marking the Client as successful.
-    long preBootstrap = System.nanoTime();
-    logger.debug("Connection to {} successful, running bootstraps...", address);
-    try {
-      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
-        clientBootstrap.doBootstrap(client, channel);
-      }
-    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
-      long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
-      logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
-      client.close();
-      throw Throwables.propagate(e);
-    }
-    long postBootstrap = System.nanoTime();
-
-    logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
-      address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
-
-    return client;
-  }
-
-  /** Close all connections in the connection pool, and shutdown the worker thread pool. */
-  @Override
-  public void close() {
-    // Go through all clients and close them if they are active.
-    for (ClientPool clientPool : connectionPool.values()) {
-      for (int i = 0; i < clientPool.clients.length; i++) {
-        TransportClient client = clientPool.clients[i];
-        if (client != null) {
-          clientPool.clients[i] = null;
-          JavaUtils.closeQuietly(client);
-        }
-      }
-    }
-    connectionPool.clear();
-
-    if (workerGroup != null) {
-      workerGroup.shutdownGracefully();
-      workerGroup = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
deleted file mode 100644
index f0e2004..0000000
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.client;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.protocol.ChunkFetchFailure;
-import org.apache.spark.network.protocol.ChunkFetchSuccess;
-import org.apache.spark.network.protocol.ResponseMessage;
-import org.apache.spark.network.protocol.RpcFailure;
-import org.apache.spark.network.protocol.RpcResponse;
-import org.apache.spark.network.protocol.StreamChunkId;
-import org.apache.spark.network.protocol.StreamFailure;
-import org.apache.spark.network.protocol.StreamResponse;
-import org.apache.spark.network.server.MessageHandler;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportFrameDecoder;
-
-/**
- * Handler that processes server responses, in response to requests issued from a
- * [[TransportClient]]. It works by tracking the list of outstanding requests (and their callbacks).
- *
- * Concurrency: thread safe and can be called from multiple threads.
- */
-public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
-  private final Logger logger = LoggerFactory.getLogger(TransportResponseHandler.class);
-
-  private final Channel channel;
-
-  private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches;
-
-  private final Map<Long, RpcResponseCallback> outstandingRpcs;
-
-  private final Queue<StreamCallback> streamCallbacks;
-  private volatile boolean streamActive;
-
-  /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
-  private final AtomicLong timeOfLastRequestNs;
-
-  public TransportResponseHandler(Channel channel) {
-    this.channel = channel;
-    this.outstandingFetches = new ConcurrentHashMap<StreamChunkId, ChunkReceivedCallback>();
-    this.outstandingRpcs = new ConcurrentHashMap<Long, RpcResponseCallback>();
-    this.streamCallbacks = new ConcurrentLinkedQueue<StreamCallback>();
-    this.timeOfLastRequestNs = new AtomicLong(0);
-  }
-
-  public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
-    updateTimeOfLastRequest();
-    outstandingFetches.put(streamChunkId, callback);
-  }
-
-  public void removeFetchRequest(StreamChunkId streamChunkId) {
-    outstandingFetches.remove(streamChunkId);
-  }
-
-  public void addRpcRequest(long requestId, RpcResponseCallback callback) {
-    updateTimeOfLastRequest();
-    outstandingRpcs.put(requestId, callback);
-  }
-
-  public void removeRpcRequest(long requestId) {
-    outstandingRpcs.remove(requestId);
-  }
-
-  public void addStreamCallback(StreamCallback callback) {
-    timeOfLastRequestNs.set(System.nanoTime());
-    streamCallbacks.offer(callback);
-  }
-
-  @VisibleForTesting
-  public void deactivateStream() {
-    streamActive = false;
-  }
-
-  /**
-   * Fire the failure callback for all outstanding requests. This is called when we have an
-   * uncaught exception or pre-mature connection termination.
-   */
-  private void failOutstandingRequests(Throwable cause) {
-    for (Map.Entry<StreamChunkId, ChunkReceivedCallback> entry : outstandingFetches.entrySet()) {
-      entry.getValue().onFailure(entry.getKey().chunkIndex, cause);
-    }
-    for (Map.Entry<Long, RpcResponseCallback> entry : outstandingRpcs.entrySet()) {
-      entry.getValue().onFailure(cause);
-    }
-
-    // It's OK if new fetches appear, as they will fail immediately.
-    outstandingFetches.clear();
-    outstandingRpcs.clear();
-  }
-
-  @Override
-  public void channelActive() {
-  }
-
-  @Override
-  public void channelInactive() {
-    if (numOutstandingRequests() > 0) {
-      String remoteAddress = NettyUtils.getRemoteAddress(channel);
-      logger.error("Still have {} requests outstanding when connection from {} is closed",
-        numOutstandingRequests(), remoteAddress);
-      failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
-    }
-  }
-
-  @Override
-  public void exceptionCaught(Throwable cause) {
-    if (numOutstandingRequests() > 0) {
-      String remoteAddress = NettyUtils.getRemoteAddress(channel);
-      logger.error("Still have {} requests outstanding when connection from {} is closed",
-        numOutstandingRequests(), remoteAddress);
-      failOutstandingRequests(cause);
-    }
-  }
-
-  @Override
-  public void handle(ResponseMessage message) throws Exception {
-    String remoteAddress = NettyUtils.getRemoteAddress(channel);
-    if (message instanceof ChunkFetchSuccess) {
-      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
-      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
-      if (listener == null) {
-        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
-          resp.streamChunkId, remoteAddress);
-        resp.body().release();
-      } else {
-        outstandingFetches.remove(resp.streamChunkId);
-        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
-        resp.body().release();
-      }
-    } else if (message instanceof ChunkFetchFailure) {
-      ChunkFetchFailure resp = (ChunkFetchFailure) message;
-      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
-      if (listener == null) {
-        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
-          resp.streamChunkId, remoteAddress, resp.errorString);
-      } else {
-        outstandingFetches.remove(resp.streamChunkId);
-        listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
-          "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
-      }
-    } else if (message instanceof RpcResponse) {
-      RpcResponse resp = (RpcResponse) message;
-      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
-      if (listener == null) {
-        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
-          resp.requestId, remoteAddress, resp.body().size());
-      } else {
-        outstandingRpcs.remove(resp.requestId);
-        try {
-          listener.onSuccess(resp.body().nioByteBuffer());
-        } finally {
-          resp.body().release();
-        }
-      }
-    } else if (message instanceof RpcFailure) {
-      RpcFailure resp = (RpcFailure) message;
-      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
-      if (listener == null) {
-        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
-          resp.requestId, remoteAddress, resp.errorString);
-      } else {
-        outstandingRpcs.remove(resp.requestId);
-        listener.onFailure(new RuntimeException(resp.errorString));
-      }
-    } else if (message instanceof StreamResponse) {
-      StreamResponse resp = (StreamResponse) message;
-      StreamCallback callback = streamCallbacks.poll();
-      if (callback != null) {
-        if (resp.byteCount > 0) {
-          StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
-            callback);
-          try {
-            TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
-              channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
-            frameDecoder.setInterceptor(interceptor);
-            streamActive = true;
-          } catch (Exception e) {
-            logger.error("Error installing stream handler.", e);
-            deactivateStream();
-          }
-        } else {
-          try {
-            callback.onComplete(resp.streamId);
-          } catch (Exception e) {
-            logger.warn("Error in stream handler onComplete().", e);
-          }
-        }
-      } else {
-        logger.error("Could not find callback for StreamResponse.");
-      }
-    } else if (message instanceof StreamFailure) {
-      StreamFailure resp = (StreamFailure) message;
-      StreamCallback callback = streamCallbacks.poll();
-      if (callback != null) {
-        try {
-          callback.onFailure(resp.streamId, new RuntimeException(resp.error));
-        } catch (IOException ioe) {
-          logger.warn("Error in stream failure handler.", ioe);
-        }
-      } else {
-        logger.warn("Stream failure with unknown callback: {}", resp.error);
-      }
-    } else {
-      throw new IllegalStateException("Unknown response type: " + message.type());
-    }
-  }
-
-  /** Returns total number of outstanding requests (fetch requests + rpcs) */
-  public int numOutstandingRequests() {
-    return outstandingFetches.size() + outstandingRpcs.size() + streamCallbacks.size() +
-      (streamActive ? 1 : 0);
-  }
-
-  /** Returns the time in nanoseconds of when the last request was sent out. */
-  public long getTimeOfLastRequestNs() {
-    return timeOfLastRequestNs.get();
-  }
-
-  /** Updates the time of the last request to the current system time. */
-  public void updateTimeOfLastRequest() {
-    timeOfLastRequestNs.set(System.nanoTime());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/protocol/AbstractMessage.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/AbstractMessage.java b/network/common/src/main/java/org/apache/spark/network/protocol/AbstractMessage.java
deleted file mode 100644
index 2924218..0000000
--- a/network/common/src/main/java/org/apache/spark/network/protocol/AbstractMessage.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.protocol;
-
-import com.google.common.base.Objects;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-
-/**
- * Abstract class for messages which optionally contain a body kept in a separate buffer.
- */
-public abstract class AbstractMessage implements Message {
-  private final ManagedBuffer body;
-  private final boolean isBodyInFrame;
-
-  protected AbstractMessage() {
-    this(null, false);
-  }
-
-  protected AbstractMessage(ManagedBuffer body, boolean isBodyInFrame) {
-    this.body = body;
-    this.isBodyInFrame = isBodyInFrame;
-  }
-
-  @Override
-  public ManagedBuffer body() {
-    return body;
-  }
-
-  @Override
-  public boolean isBodyInFrame() {
-    return isBodyInFrame;
-  }
-
-  protected boolean equals(AbstractMessage other) {
-    return isBodyInFrame == other.isBodyInFrame && Objects.equal(body, other.body);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/protocol/AbstractResponseMessage.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/AbstractResponseMessage.java b/network/common/src/main/java/org/apache/spark/network/protocol/AbstractResponseMessage.java
deleted file mode 100644
index c362c92..0000000
--- a/network/common/src/main/java/org/apache/spark/network/protocol/AbstractResponseMessage.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.protocol;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-
-/**
- * Abstract class for response messages.
- */
-public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
-
-  protected AbstractResponseMessage(ManagedBuffer body, boolean isBodyInFrame) {
-    super(body, isBodyInFrame);
-  }
-
-  public abstract ResponseMessage createFailureResponse(String error);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
deleted file mode 100644
index 7b28a9a..0000000
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-/**
- * Response to {@link ChunkFetchRequest} when there is an error fetching the chunk.
- */
-public final class ChunkFetchFailure extends AbstractMessage implements ResponseMessage {
-  public final StreamChunkId streamChunkId;
-  public final String errorString;
-
-  public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString) {
-    this.streamChunkId = streamChunkId;
-    this.errorString = errorString;
-  }
-
-  @Override
-  public Type type() { return Type.ChunkFetchFailure; }
-
-  @Override
-  public int encodedLength() {
-    return streamChunkId.encodedLength() + Encoders.Strings.encodedLength(errorString);
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    streamChunkId.encode(buf);
-    Encoders.Strings.encode(buf, errorString);
-  }
-
-  public static ChunkFetchFailure decode(ByteBuf buf) {
-    StreamChunkId streamChunkId = StreamChunkId.decode(buf);
-    String errorString = Encoders.Strings.decode(buf);
-    return new ChunkFetchFailure(streamChunkId, errorString);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(streamChunkId, errorString);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof ChunkFetchFailure) {
-      ChunkFetchFailure o = (ChunkFetchFailure) other;
-      return streamChunkId.equals(o.streamChunkId) && errorString.equals(o.errorString);
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("streamChunkId", streamChunkId)
-      .add("errorString", errorString)
-      .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java
deleted file mode 100644
index 26d063f..0000000
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchRequest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-/**
- * Request to fetch a sequence of a single chunk of a stream. This will correspond to a single
- * {@link org.apache.spark.network.protocol.ResponseMessage} (either success or failure).
- */
-public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
-  public final StreamChunkId streamChunkId;
-
-  public ChunkFetchRequest(StreamChunkId streamChunkId) {
-    this.streamChunkId = streamChunkId;
-  }
-
-  @Override
-  public Type type() { return Type.ChunkFetchRequest; }
-
-  @Override
-  public int encodedLength() {
-    return streamChunkId.encodedLength();
-  }
-
-  @Override
-  public void encode(ByteBuf buf) {
-    streamChunkId.encode(buf);
-  }
-
-  public static ChunkFetchRequest decode(ByteBuf buf) {
-    return new ChunkFetchRequest(StreamChunkId.decode(buf));
-  }
-
-  @Override
-  public int hashCode() {
-    return streamChunkId.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof ChunkFetchRequest) {
-      ChunkFetchRequest o = (ChunkFetchRequest) other;
-      return streamChunkId.equals(o.streamChunkId);
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("streamChunkId", streamChunkId)
-      .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java
deleted file mode 100644
index 94c2ac9..0000000
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchSuccess.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.protocol;
-
-import com.google.common.base.Objects;
-import io.netty.buffer.ByteBuf;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NettyManagedBuffer;
-
-/**
- * Response to {@link ChunkFetchRequest} when a chunk exists and has been successfully fetched.
- *
- * Note that the server-side encoding of this messages does NOT include the buffer itself, as this
- * may be written by Netty in a more efficient manner (i.e., zero-copy write).
- * Similarly, the client-side decoding will reuse the Netty ByteBuf as the buffer.
- */
-public final class ChunkFetchSuccess extends AbstractResponseMessage {
-  public final StreamChunkId streamChunkId;
-
-  public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer) {
-    super(buffer, true);
-    this.streamChunkId = streamChunkId;
-  }
-
-  @Override
-  public Type type() { return Type.ChunkFetchSuccess; }
-
-  @Override
-  public int encodedLength() {
-    return streamChunkId.encodedLength();
-  }
-
-  /** Encoding does NOT include 'buffer' itself. See {@link MessageEncoder}. */
-  @Override
-  public void encode(ByteBuf buf) {
-    streamChunkId.encode(buf);
-  }
-
-  @Override
-  public ResponseMessage createFailureResponse(String error) {
-    return new ChunkFetchFailure(streamChunkId, error);
-  }
-
-  /** Decoding uses the given ByteBuf as our data, and will retain() it. */
-  public static ChunkFetchSuccess decode(ByteBuf buf) {
-    StreamChunkId streamChunkId = StreamChunkId.decode(buf);
-    buf.retain();
-    NettyManagedBuffer managedBuf = new NettyManagedBuffer(buf.duplicate());
-    return new ChunkFetchSuccess(streamChunkId, managedBuf);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(streamChunkId, body());
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof ChunkFetchSuccess) {
-      ChunkFetchSuccess o = (ChunkFetchSuccess) other;
-      return streamChunkId.equals(o.streamChunkId) && super.equals(o);
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("streamChunkId", streamChunkId)
-      .add("buffer", body())
-      .toString();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org