You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/11/01 22:37:50 UTC

[2/3] [SPARK-3796] Create external service which can serve shuffle files

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java
deleted file mode 100644
index 9688705..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/DefaultStreamManager.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-
-/**
- * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually
- * fetched as chunks by the client.
- */
-public class DefaultStreamManager extends StreamManager {
-  private final Logger logger = LoggerFactory.getLogger(DefaultStreamManager.class);
-
-  private final AtomicLong nextStreamId;
-  private final Map<Long, StreamState> streams;
-
-  /** State of a single stream. */
-  private static class StreamState {
-    final Iterator<ManagedBuffer> buffers;
-
-    // Used to keep track of the index of the buffer that the user has retrieved, just to ensure
-    // that the caller only requests each chunk one at a time, in order.
-    int curChunk = 0;
-
-    StreamState(Iterator<ManagedBuffer> buffers) {
-      this.buffers = buffers;
-    }
-  }
-
-  public DefaultStreamManager() {
-    // For debugging purposes, start with a random stream id to help identifying different streams.
-    // This does not need to be globally unique, only unique to this class.
-    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
-    streams = new ConcurrentHashMap<Long, StreamState>();
-  }
-
-  @Override
-  public ManagedBuffer getChunk(long streamId, int chunkIndex) {
-    StreamState state = streams.get(streamId);
-    if (chunkIndex != state.curChunk) {
-      throw new IllegalStateException(String.format(
-        "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
-    } else if (!state.buffers.hasNext()) {
-      throw new IllegalStateException(String.format(
-        "Requested chunk index beyond end %s", chunkIndex));
-    }
-    state.curChunk += 1;
-    ManagedBuffer nextChunk = state.buffers.next();
-
-    if (!state.buffers.hasNext()) {
-      logger.trace("Removing stream id {}", streamId);
-      streams.remove(streamId);
-    }
-
-    return nextChunk;
-  }
-
-  @Override
-  public void connectionTerminated(long streamId) {
-    // Release all remaining buffers.
-    StreamState state = streams.remove(streamId);
-    if (state != null && state.buffers != null) {
-      while (state.buffers.hasNext()) {
-        state.buffers.next().release();
-      }
-    }
-  }
-
-  /**
-   * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
-   * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
-   * client connection is closed before the iterator is fully drained, then the remaining buffers
-   * will all be release()'d.
-   */
-  public long registerStream(Iterator<ManagedBuffer> buffers) {
-    long myStreamId = nextStreamId.getAndIncrement();
-    streams.put(myStreamId, new StreamState(buffers));
-    return myStreamId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
new file mode 100644
index 0000000..5a3f003
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
@@ -0,0 +1,38 @@
+package org.apache.spark.network.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+
+/** An RpcHandler suitable for a client-only TransportContext, which cannot receive RPCs. */
+public class NoOpRpcHandler implements RpcHandler {
+  private final StreamManager streamManager;
+
+  public NoOpRpcHandler() {
+    streamManager = new OneForOneStreamManager();
+  }
+
+  @Override
+  public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
+    throw new UnsupportedOperationException("Cannot handle messages");
+  }
+
+  @Override
+  public StreamManager getStreamManager() { return streamManager; }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
new file mode 100644
index 0000000..731d48d
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+/**
+ * StreamManager which allows registration of an Iterator<ManagedBuffer>, which are individually
+ * fetched as chunks by the client. Each registered buffer is one chunk.
+ */
+public class OneForOneStreamManager extends StreamManager {
+  private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);
+
+  private final AtomicLong nextStreamId;
+  private final Map<Long, StreamState> streams;
+
+  /** State of a single stream. */
+  private static class StreamState {
+    final Iterator<ManagedBuffer> buffers;
+
+    // Used to keep track of the index of the buffer that the user has retrieved, just to ensure
+    // that the caller only requests each chunk one at a time, in order.
+    int curChunk = 0;
+
+    StreamState(Iterator<ManagedBuffer> buffers) {
+      this.buffers = buffers;
+    }
+  }
+
+  public OneForOneStreamManager() {
+    // For debugging purposes, start with a random stream id to help identifying different streams.
+    // This does not need to be globally unique, only unique to this class.
+    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
+    streams = new ConcurrentHashMap<Long, StreamState>();
+  }
+
+  @Override
+  public ManagedBuffer getChunk(long streamId, int chunkIndex) {
+    StreamState state = streams.get(streamId);
+    if (chunkIndex != state.curChunk) {
+      throw new IllegalStateException(String.format(
+        "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
+    } else if (!state.buffers.hasNext()) {
+      throw new IllegalStateException(String.format(
+        "Requested chunk index beyond end %s", chunkIndex));
+    }
+    state.curChunk += 1;
+    ManagedBuffer nextChunk = state.buffers.next();
+
+    if (!state.buffers.hasNext()) {
+      logger.trace("Removing stream id {}", streamId);
+      streams.remove(streamId);
+    }
+
+    return nextChunk;
+  }
+
+  @Override
+  public void connectionTerminated(long streamId) {
+    // Release all remaining buffers.
+    StreamState state = streams.remove(streamId);
+    if (state != null && state.buffers != null) {
+      while (state.buffers.hasNext()) {
+        state.buffers.next().release();
+      }
+    }
+  }
+
+  /**
+   * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
+   * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
+   * client connection is closed before the iterator is fully drained, then the remaining buffers
+   * will all be release()'d.
+   */
+  public long registerStream(Iterator<ManagedBuffer> buffers) {
+    long myStreamId = nextStreamId.getAndIncrement();
+    streams.put(myStreamId, new StreamState(buffers));
+    return myStreamId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
index f54a696..2369dc6 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
@@ -35,4 +35,10 @@ public interface RpcHandler {
    *                 RPC.
    */
   void receive(TransportClient client, byte[] message, RpcResponseCallback callback);
+
+  /**
+   * Returns the StreamManager which contains the state about which streams are currently being
+   * fetched by a TransportClient.
+   */
+  StreamManager getStreamManager();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 352f865..17fe900 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -56,24 +56,23 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
   /** Client on the same channel allowing us to talk back to the requester. */
   private final TransportClient reverseClient;
 
-  /** Returns each chunk part of a stream. */
-  private final StreamManager streamManager;
-
   /** Handles all RPC messages. */
   private final RpcHandler rpcHandler;
 
+  /** Returns each chunk part of a stream. */
+  private final StreamManager streamManager;
+
   /** List of all stream ids that have been read on this handler, used for cleanup. */
   private final Set<Long> streamIds;
 
   public TransportRequestHandler(
       Channel channel,
       TransportClient reverseClient,
-      StreamManager streamManager,
       RpcHandler rpcHandler) {
     this.channel = channel;
     this.reverseClient = reverseClient;
-    this.streamManager = streamManager;
     this.rpcHandler = rpcHandler;
+    this.streamManager = rpcHandler.getStreamManager();
     this.streamIds = Sets.newHashSet();
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 2430707..d1a1877 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -49,11 +49,11 @@ public class TransportServer implements Closeable {
   private ChannelFuture channelFuture;
   private int port = -1;
 
-  public TransportServer(TransportContext context) {
+  public TransportServer(TransportContext context, int portToBind) {
     this.context = context;
     this.conf = context.getConf();
 
-    init();
+    init(portToBind);
   }
 
   public int getPort() {
@@ -63,7 +63,7 @@ public class TransportServer implements Closeable {
     return port;
   }
 
-  private void init() {
+  private void init(int portToBind) {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     EventLoopGroup bossGroup =
@@ -95,7 +95,7 @@ public class TransportServer implements Closeable {
       }
     });
 
-    channelFuture = bootstrap.bind(new InetSocketAddress(conf.serverPort()));
+    channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
     channelFuture.syncUninterruptibly();
 
     port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 32ba3f5..40b71b0 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -17,8 +17,12 @@
 
 package org.apache.spark.network.util;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 import com.google.common.io.Closeables;
 import org.slf4j.Logger;
@@ -35,4 +39,38 @@ public class JavaUtils {
       logger.error("IOException should not have been thrown.", e);
     }
   }
+
+  // TODO: Make this configurable, do not use Java serialization!
+  public static <T> T deserialize(byte[] bytes) {
+    try {
+      ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(bytes));
+      Object out = is.readObject();
+      is.close();
+      return (T) out;
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("Could not deserialize object", e);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not deserialize object", e);
+    }
+  }
+
+  // TODO: Make this configurable, do not use Java serialization!
+  public static byte[] serialize(Object object) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream os = new ObjectOutputStream(baos);
+      os.writeObject(object);
+      os.close();
+      return baos.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serialize object", e);
+    }
+  }
+
+  /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */
+  public static int nonNegativeHash(Object obj) {
+    if (obj == null) { return 0; }
+    int hash = obj.hashCode();
+    return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
new file mode 100644
index 0000000..5f20b70
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.util.NoSuchElementException;
+
+import org.apache.spark.network.util.ConfigProvider;
+
+/** Uses System properties to obtain config values. */
+public class SystemPropertyConfigProvider extends ConfigProvider {
+  @Override
+  public String get(String name) {
+    String value = System.getProperty(name);
+    if (value == null) {
+      throw new NoSuchElementException(name);
+    }
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 80f65d9..a68f38e 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -27,9 +27,6 @@ public class TransportConf {
     this.conf = conf;
   }
 
-  /** Port the server listens on. Default to a random port. */
-  public int serverPort() { return conf.getInt("spark.shuffle.io.port", 0); }
-
   /** IO mode: nio or epoll */
   public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase(); }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index 738dca9..c415883 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -41,10 +41,13 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
 import org.apache.spark.network.client.ChunkReceivedCallback;
+import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.server.TransportServer;
 import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
 import org.apache.spark.network.util.TransportConf;
 
 public class ChunkFetchIntegrationSuite {
@@ -93,7 +96,18 @@ public class ChunkFetchIntegrationSuite {
         }
       }
     };
-    TransportContext context = new TransportContext(conf, streamManager, new NoOpRpcHandler());
+    RpcHandler handler = new RpcHandler() {
+      @Override
+      public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public StreamManager getStreamManager() {
+        return streamManager;
+      }
+    };
+    TransportContext context = new TransportContext(conf, handler);
     server = context.createServer();
     clientFactory = context.createClientFactory();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java b/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java
deleted file mode 100644
index 7aa37ef..0000000
--- a/network/common/src/test/java/org/apache/spark/network/NoOpRpcHandler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.spark.network;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.server.RpcHandler;
-
-/** Test RpcHandler which always returns a zero-sized success. */
-public class NoOpRpcHandler implements RpcHandler {
-  @Override
-  public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
-    callback.onSuccess(new byte[0]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 9f216dd..64b457b 100644
--- a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -35,9 +35,11 @@ import static org.junit.Assert.*;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientFactory;
-import org.apache.spark.network.server.DefaultStreamManager;
+import org.apache.spark.network.server.OneForOneStreamManager;
 import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
 import org.apache.spark.network.util.TransportConf;
 
 public class RpcIntegrationSuite {
@@ -61,8 +63,11 @@ public class RpcIntegrationSuite {
           throw new RuntimeException("Thrown: " + parts[1]);
         }
       }
+
+      @Override
+      public StreamManager getStreamManager() { return new OneForOneStreamManager(); }
     };
-    TransportContext context = new TransportContext(conf, new DefaultStreamManager(), rpcHandler);
+    TransportContext context = new TransportContext(conf, rpcHandler);
     server = context.createServer();
     clientFactory = context.createClientFactory();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java b/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java
deleted file mode 100644
index f4e0a24..0000000
--- a/network/common/src/test/java/org/apache/spark/network/SystemPropertyConfigProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network;
-
-import java.util.NoSuchElementException;
-
-import org.apache.spark.network.util.ConfigProvider;
-
-/** Uses System properties to obtain config values. */
-public class SystemPropertyConfigProvider extends ConfigProvider {
-  @Override
-  public String get(String name) {
-    String value = System.getProperty(name);
-    if (value == null) {
-      throw new NoSuchElementException(name);
-    }
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index 3ef9646..5a10fdb 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -28,11 +28,11 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientFactory;
-import org.apache.spark.network.server.DefaultStreamManager;
+import org.apache.spark.network.server.NoOpRpcHandler;
 import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
 import org.apache.spark.network.util.TransportConf;
 
 public class TransportClientFactorySuite {
@@ -44,9 +44,8 @@ public class TransportClientFactorySuite {
   @Before
   public void setUp() {
     conf = new TransportConf(new SystemPropertyConfigProvider());
-    StreamManager streamManager = new DefaultStreamManager();
     RpcHandler rpcHandler = new NoOpRpcHandler();
-    context = new TransportContext(conf, streamManager, rpcHandler);
+    context = new TransportContext(conf, rpcHandler);
     server1 = context.createServer();
     server2 = context.createServer();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/pom.xml
----------------------------------------------------------------------
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
new file mode 100644
index 0000000..d271704
--- /dev/null
+++ b/network/shuffle/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-network-shuffle_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Shuffle Streaming Service Code</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>network-shuffle</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <!-- Core dependencies -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-common_2.10</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <!-- Provided dependencies -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- Test dependencies -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-common_2.10</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.novocode</groupId>
+      <artifactId>junit-interface</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
new file mode 100644
index 0000000..138fd53
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.EventListener;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+
+public interface BlockFetchingListener extends EventListener {
+  /**
+   * Called once per successfully fetched block. After this call returns, data will be released
+   * automatically. If the data will be passed to another thread, the receiver should retain()
+   * and release() the buffer on their own, or copy the data to a new buffer.
+   */
+  void onBlockFetchSuccess(String blockId, ManagedBuffer data);
+
+  /**
+   * Called at least once per block upon failures.
+   */
+  void onBlockFetchFailure(String blockId, Throwable exception);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java
new file mode 100644
index 0000000..d45e646
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExecutorShuffleInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+
+/** Contains all configuration necessary for locating the shuffle files of an executor. */
+public class ExecutorShuffleInfo implements Serializable {
+  /** The base set of local directories that the executor stores its shuffle files in. */
+  final String[] localDirs;
+  /** Number of subdirectories created within each localDir. */
+  final int subDirsPerLocalDir;
+  /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
+  final String shuffleManager;
+
+  public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager) {
+    this.localDirs = localDirs;
+    this.subDirsPerLocalDir = subDirsPerLocalDir;
+    this.shuffleManager = shuffleManager;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("localDirs", Arrays.toString(localDirs))
+      .add("subDirsPerLocalDir", subDirsPerLocalDir)
+      .add("shuffleManager", shuffleManager)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof ExecutorShuffleInfo) {
+      ExecutorShuffleInfo o = (ExecutorShuffleInfo) other;
+      return Arrays.equals(localDirs, o.localDirs)
+        && Objects.equal(subDirsPerLocalDir, o.subDirsPerLocalDir)
+        && Objects.equal(shuffleManager, o.shuffleManager);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
new file mode 100644
index 0000000..a9dff31
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.spark.network.shuffle.ExternalShuffleMessages.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
+ *
+ * Handles registering executors and opening shuffle blocks from them. Shuffle blocks are registered
+ * with the "one-for-one" strategy, meaning each Transport-layer Chunk is equivalent to one Spark-
+ * level shuffle block.
+ */
+public class ExternalShuffleBlockHandler implements RpcHandler {
+  private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
+
+  private final ExternalShuffleBlockManager blockManager;
+  private final OneForOneStreamManager streamManager;
+
+  public ExternalShuffleBlockHandler() {
+    this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
+  }
+
+  /** Enables mocking out the StreamManager and BlockManager. */
+  @VisibleForTesting
+  ExternalShuffleBlockHandler(
+      OneForOneStreamManager streamManager,
+      ExternalShuffleBlockManager blockManager) {
+    this.streamManager = streamManager;
+    this.blockManager = blockManager;
+  }
+
+  @Override
+  public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
+    Object msgObj = JavaUtils.deserialize(message);
+
+    logger.trace("Received message: " + msgObj);
+
+    if (msgObj instanceof OpenShuffleBlocks) {
+      OpenShuffleBlocks msg = (OpenShuffleBlocks) msgObj;
+      List<ManagedBuffer> blocks = Lists.newArrayList();
+
+      for (String blockId : msg.blockIds) {
+        blocks.add(blockManager.getBlockData(msg.appId, msg.execId, blockId));
+      }
+      long streamId = streamManager.registerStream(blocks.iterator());
+      logger.trace("Registered streamId {} with {} buffers", streamId, msg.blockIds.length);
+      callback.onSuccess(JavaUtils.serialize(
+        new ShuffleStreamHandle(streamId, msg.blockIds.length)));
+
+    } else if (msgObj instanceof RegisterExecutor) {
+      RegisterExecutor msg = (RegisterExecutor) msgObj;
+      blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
+      callback.onSuccess(new byte[0]);
+
+    } else {
+      throw new UnsupportedOperationException(String.format(
+        "Unexpected message: %s (class = %s)", msgObj, msgObj.getClass()));
+    }
+  }
+
+  @Override
+  public StreamManager getStreamManager() {
+    return streamManager;
+  }
+
+  /** For testing, clears all executors registered with "RegisterExecutor". */
+  @VisibleForTesting
+  public void clearRegisteredExecutors() {
+    blockManager.clearRegisteredExecutors();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
new file mode 100644
index 0000000..6589889
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
+ * of Executors. Each Executor must register its own configuration about where it stores its files
+ * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
+ * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
+ *
+ * Executors with shuffle file consolidation are not currently supported, as the index is stored in
+ * the Executor's memory, unlike the IndexShuffleBlockManager.
+ */
+public class ExternalShuffleBlockManager {
+  private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
+
+  // Map from "appId-execId" to the executor's configuration.
+  private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
+    new ConcurrentHashMap<String, ExecutorShuffleInfo>();
+
+  // Returns an id suitable for a single executor within a single application.
+  private String getAppExecId(String appId, String execId) {
+    return appId + "-" + execId;
+  }
+
+  /** Registers a new Executor with all the configuration we need to find its shuffle files. */
+  public void registerExecutor(
+      String appId,
+      String execId,
+      ExecutorShuffleInfo executorInfo) {
+    String fullId = getAppExecId(appId, execId);
+    logger.info("Registered executor {} with {}", fullId, executorInfo);
+    executors.put(fullId, executorInfo);
+  }
+
+  /**
+   * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
+   * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
+   * assumptions about how the hash and sort based shuffles store their data.
+   */
+  public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
+    String[] blockIdParts = blockId.split("_");
+    if (blockIdParts.length < 4) {
+      throw new IllegalArgumentException("Unexpected block id format: " + blockId);
+    } else if (!blockIdParts[0].equals("shuffle")) {
+      throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
+    }
+    int shuffleId = Integer.parseInt(blockIdParts[1]);
+    int mapId = Integer.parseInt(blockIdParts[2]);
+    int reduceId = Integer.parseInt(blockIdParts[3]);
+
+    ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
+    if (executor == null) {
+      throw new RuntimeException(
+        String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
+    }
+
+    if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
+      return getHashBasedShuffleBlockData(executor, blockId);
+    } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) {
+      return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
+    } else {
+      throw new UnsupportedOperationException(
+        "Unsupported shuffle manager: " + executor.shuffleManager);
+    }
+  }
+
+  /**
+   * Hash-based shuffle data is simply stored as one file per block.
+   * This logic is from FileShuffleBlockManager.
+   */
+  // TODO: Support consolidated hash shuffle files
+  private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
+    File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
+    return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
+  }
+
+  /**
+   * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
+   * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
+   * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
+   */
+  private ManagedBuffer getSortBasedShuffleBlockData(
+    ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
+    File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
+      "shuffle_" + shuffleId + "_" + mapId + "_0.index");
+
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new FileInputStream(indexFile));
+      in.skipBytes(reduceId * 8);
+      long offset = in.readLong();
+      long nextOffset = in.readLong();
+      return new FileSegmentManagedBuffer(
+        getFile(executor.localDirs, executor.subDirsPerLocalDir,
+          "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
+        offset,
+        nextOffset - offset);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to open file: " + indexFile, e);
+    } finally {
+      if (in != null) {
+        JavaUtils.closeQuietly(in);
+      }
+    }
+  }
+
+  /**
+   * Hashes a filename into the corresponding local directory, in a manner consistent with
+   * Spark's DiskBlockManager.getFile().
+   */
+  @VisibleForTesting
+  static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
+    int hash = JavaUtils.nonNegativeHash(filename);
+    String localDir = localDirs[hash % localDirs.length];
+    int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
+    return new File(new File(localDir, String.format("%02x", subDirId)), filename);
+  }
+
+  /** For testing, clears all registered executors. */
+  @VisibleForTesting
+  void clearRegisteredExecutors() {
+    executors.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
new file mode 100644
index 0000000..cc2f626
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.server.NoOpRpcHandler;
+import org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Client for reading shuffle blocks which points to an external (outside of executor) server.
+ * This is instead of reading shuffle blocks directly from other executors (via
+ * BlockTransferService), which has the downside of losing the shuffle data if we lose the
+ * executors.
+ */
+public class ExternalShuffleClient implements ShuffleClient {
+  private final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class);
+
+  private final TransportClientFactory clientFactory;
+  private final String appId;
+
+  public ExternalShuffleClient(TransportConf conf, String appId) {
+    TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
+    this.clientFactory = context.createClientFactory();
+    this.appId = appId;
+  }
+
+  @Override
+  public void fetchBlocks(
+      String host,
+      int port,
+      String execId,
+      String[] blockIds,
+      BlockFetchingListener listener) {
+    logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
+    try {
+      TransportClient client = clientFactory.createClient(host, port);
+      new OneForOneBlockFetcher(client, blockIds, listener)
+        .start(new ExternalShuffleMessages.OpenShuffleBlocks(appId, execId, blockIds));
+    } catch (Exception e) {
+      logger.error("Exception while beginning fetchBlocks", e);
+      for (String blockId : blockIds) {
+        listener.onBlockFetchFailure(blockId, e);
+      }
+    }
+  }
+
+  /**
+   * Registers this executor with an external shuffle server. This registration is required to
+   * inform the shuffle server about where and how we store our shuffle files.
+   *
+   * @param host Host of shuffle server.
+   * @param port Port of shuffle server.
+   * @param execId This Executor's id.
+   * @param executorInfo Contains all info necessary for the service to find our shuffle files.
+   */
+  public void registerWithShuffleServer(
+      String host,
+      int port,
+      String execId,
+      ExecutorShuffleInfo executorInfo) {
+    TransportClient client = clientFactory.createClient(host, port);
+    byte[] registerExecutorMessage =
+      JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
+    client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java
new file mode 100644
index 0000000..e79420e
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleMessages.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+
+/** Messages handled by the {@link ExternalShuffleBlockHandler}. */
+public class ExternalShuffleMessages {
+
+  /** Request to read a set of shuffle blocks. Returns [[ShuffleStreamHandle]]. */
+  public static class OpenShuffleBlocks implements Serializable {
+    public final String appId;
+    public final String execId;
+    public final String[] blockIds;
+
+    public OpenShuffleBlocks(String appId, String execId, String[] blockIds) {
+      this.appId = appId;
+      this.execId = execId;
+      this.blockIds = blockIds;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("execId", execId)
+        .add("blockIds", Arrays.toString(blockIds))
+        .toString();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other != null && other instanceof OpenShuffleBlocks) {
+        OpenShuffleBlocks o = (OpenShuffleBlocks) other;
+        return Objects.equal(appId, o.appId)
+          && Objects.equal(execId, o.execId)
+          && Arrays.equals(blockIds, o.blockIds);
+      }
+      return false;
+    }
+  }
+
+  /** Initial registration message between an executor and its local shuffle server. */
+  public static class RegisterExecutor implements Serializable {
+    public final String appId;
+    public final String execId;
+    public final ExecutorShuffleInfo executorInfo;
+
+    public RegisterExecutor(
+        String appId,
+        String execId,
+        ExecutorShuffleInfo executorInfo) {
+      this.appId = appId;
+      this.execId = execId;
+      this.executorInfo = executorInfo;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(appId, execId, executorInfo);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("execId", execId)
+        .add("executorInfo", executorInfo)
+        .toString();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other != null && other instanceof RegisterExecutor) {
+        RegisterExecutor o = (RegisterExecutor) other;
+        return Objects.equal(appId, o.appId)
+          && Objects.equal(execId, o.execId)
+          && Objects.equal(executorInfo, o.executorInfo);
+      }
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
new file mode 100644
index 0000000..39b6f30
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.ChunkReceivedCallback;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and
+ * invokes the BlockFetchingListener appropriately. This class is agnostic to the actual RPC
+ * handler, as long as there is a single "open blocks" message which returns a ShuffleStreamHandle,
+ * and Java serialization is used.
+ *
+ * Note that this typically corresponds to a
+ * {@link org.apache.spark.network.server.OneForOneStreamManager} on the server side.
+ */
+public class OneForOneBlockFetcher {
+  private final Logger logger = LoggerFactory.getLogger(OneForOneBlockFetcher.class);
+
+  private final TransportClient client;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final ChunkReceivedCallback chunkCallback;
+
+  private ShuffleStreamHandle streamHandle = null;
+
+  public OneForOneBlockFetcher(
+      TransportClient client,
+      String[] blockIds,
+      BlockFetchingListener listener) {
+    if (blockIds.length == 0) {
+      throw new IllegalArgumentException("Zero-sized blockIds array");
+    }
+    this.client = client;
+    this.blockIds = blockIds;
+    this.listener = listener;
+    this.chunkCallback = new ChunkCallback();
+  }
+
+  /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
+  private class ChunkCallback implements ChunkReceivedCallback {
+    @Override
+    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+      // On receipt of a chunk, pass it upwards as a block.
+      listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
+    }
+
+    @Override
+    public void onFailure(int chunkIndex, Throwable e) {
+      // On receipt of a failure, fail every block from chunkIndex onwards.
+      String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
+      failRemainingBlocks(remainingBlockIds, e);
+    }
+  }
+
+  /**
+   * Begins the fetching process, calling the listener with every block fetched.
+   * The given message will be serialized with the Java serializer, and the RPC must return a
+   * {@link ShuffleStreamHandle}. We will send all fetch requests immediately, without throttling.
+   */
+  public void start(Object openBlocksMessage) {
+    client.sendRpc(JavaUtils.serialize(openBlocksMessage), new RpcResponseCallback() {
+      @Override
+      public void onSuccess(byte[] response) {
+        try {
+          streamHandle = JavaUtils.deserialize(response);
+          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);
+
+          // Immediately request all chunks -- we expect that the total size of the request is
+          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
+          for (int i = 0; i < streamHandle.numChunks; i++) {
+            client.fetchChunk(streamHandle.streamId, i, chunkCallback);
+          }
+        } catch (Exception e) {
+          logger.error("Failed while starting block fetches", e);
+          failRemainingBlocks(blockIds, e);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        logger.error("Failed while starting block fetches", e);
+        failRemainingBlocks(blockIds, e);
+      }
+    });
+  }
+
+  /** Invokes the "onBlockFetchFailure" callback for every listed block id. */
+  private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
+    for (String blockId : failedBlockIds) {
+      try {
+        listener.onBlockFetchFailure(blockId, e);
+      } catch (Exception e2) {
+        logger.error("Error in block fetch failure callback", e2);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
new file mode 100644
index 0000000..9fa87c2
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+/** Provides an interface for reading shuffle files, either from an Executor or external service. */
+public interface ShuffleClient {
+  /**
+   * Fetch a sequence of blocks from a remote node asynchronously,
+   *
+   * Note that this API takes a sequence so the implementation can batch requests, and does not
+   * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
+   * the data of a block is fetched, rather than waiting for all blocks to be fetched.
+   */
+  public void fetchBlocks(
+      String host,
+      int port,
+      String execId,
+      String[] blockIds,
+      BlockFetchingListener listener);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java
new file mode 100644
index 0000000..9c94691
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleStreamHandle.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+
+/**
+ * Identifier for a fixed number of chunks to read from a stream created by an "open blocks"
+ * message. This is used by {@link OneForOneBlockFetcher}.
+ */
+public class ShuffleStreamHandle implements Serializable {
+  public final long streamId;
+  public final int numChunks;
+
+  public ShuffleStreamHandle(long streamId, int numChunks) {
+    this.streamId = streamId;
+    this.numChunks = numChunks;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(streamId, numChunks);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("streamId", streamId)
+      .add("numChunks", numChunks)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof ShuffleStreamHandle) {
+      ShuffleStreamHandle o = (ShuffleStreamHandle) other;
+      return Objects.equal(streamId, o.streamId)
+        && Objects.equal(numChunks, o.numChunks);
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
new file mode 100644
index 0000000..7939cb4
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.apache.spark.network.shuffle.ExternalShuffleMessages.OpenShuffleBlocks;
+import static org.apache.spark.network.shuffle.ExternalShuffleMessages.RegisterExecutor;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.util.JavaUtils;
+
+public class ExternalShuffleBlockHandlerSuite {
+  TransportClient client = mock(TransportClient.class);
+
+  OneForOneStreamManager streamManager;
+  ExternalShuffleBlockManager blockManager;
+  RpcHandler handler;
+
+  @Before
+  public void beforeEach() {
+    streamManager = mock(OneForOneStreamManager.class);
+    blockManager = mock(ExternalShuffleBlockManager.class);
+    handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
+  }
+
+  @Test
+  public void testRegisterExecutor() {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
+    byte[] registerMessage = JavaUtils.serialize(
+      new RegisterExecutor("app0", "exec1", config));
+    handler.receive(client, registerMessage, callback);
+    verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
+
+    verify(callback, times(1)).onSuccess((byte[]) any());
+    verify(callback, never()).onFailure((Throwable) any());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testOpenShuffleBlocks() {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
+    ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
+    when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
+    when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
+    byte[] openBlocksMessage = JavaUtils.serialize(
+      new OpenShuffleBlocks("app0", "exec1", new String[] { "b0", "b1" }));
+    handler.receive(client, openBlocksMessage, callback);
+    verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
+    verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
+
+    ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
+    verify(callback, times(1)).onSuccess(response.capture());
+    verify(callback, never()).onFailure((Throwable) any());
+
+    ShuffleStreamHandle handle = JavaUtils.deserialize(response.getValue());
+    assertEquals(2, handle.numChunks);
+
+    ArgumentCaptor<Iterator> stream = ArgumentCaptor.forClass(Iterator.class);
+    verify(streamManager, times(1)).registerStream(stream.capture());
+    Iterator<ManagedBuffer> buffers = (Iterator<ManagedBuffer>) stream.getValue();
+    assertEquals(block0Marker, buffers.next());
+    assertEquals(block1Marker, buffers.next());
+    assertFalse(buffers.hasNext());
+  }
+
+  @Test
+  public void testBadMessages() {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    byte[] unserializableMessage = new byte[] { 0x12, 0x34, 0x56 };
+    try {
+      handler.receive(client, unserializableMessage, callback);
+      fail("Should have thrown");
+    } catch (Exception e) {
+      // pass
+    }
+
+    byte[] unexpectedMessage = JavaUtils.serialize(
+      new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort"));
+    try {
+      handler.receive(client, unexpectedMessage, callback);
+      fail("Should have thrown");
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+
+    verify(callback, never()).onSuccess((byte[]) any());
+    verify(callback, never()).onFailure((Throwable) any());
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f55218ae/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
new file mode 100644
index 0000000..da54797
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import com.google.common.io.CharStreams;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ExternalShuffleBlockManagerSuite {
+  static String sortBlock0 = "Hello!";
+  static String sortBlock1 = "World!";
+
+  static String hashBlock0 = "Elementary";
+  static String hashBlock1 = "Tabular";
+
+  static TestShuffleDataContext dataContext;
+
+  @BeforeClass
+  public static void beforeAll() throws IOException {
+    dataContext = new TestShuffleDataContext(2, 5);
+
+    dataContext.create();
+    // Write some sort and hash data.
+    dataContext.insertSortShuffleData(0, 0,
+      new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
+    dataContext.insertHashShuffleData(1, 0,
+      new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
+  }
+
+  @AfterClass
+  public static void afterAll() {
+    dataContext.cleanup();
+  }
+
+  @Test
+  public void testBadRequests() {
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    // Unregistered executor
+    try {
+      manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (RuntimeException e) {
+      assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
+    }
+
+    // Invalid shuffle manager
+    manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
+    try {
+      manager.getBlockData("app0", "exec2", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+
+    // Nonexistent shuffle block
+    manager.registerExecutor("app0", "exec3",
+      dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
+    try {
+      manager.getBlockData("app0", "exec3", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (Exception e) {
+      // pass
+    }
+  }
+
+  @Test
+  public void testSortShuffleBlocks() throws IOException {
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    manager.registerExecutor("app0", "exec0",
+      dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
+
+    InputStream block0Stream =
+      manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
+    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+    block0Stream.close();
+    assertEquals(sortBlock0, block0);
+
+    InputStream block1Stream =
+      manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
+    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+    block1Stream.close();
+    assertEquals(sortBlock1, block1);
+  }
+
+  @Test
+  public void testHashShuffleBlocks() throws IOException {
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+    manager.registerExecutor("app0", "exec0",
+      dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
+
+    InputStream block0Stream =
+      manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
+    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+    block0Stream.close();
+    assertEquals(hashBlock0, block0);
+
+    InputStream block1Stream =
+      manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
+    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+    block1Stream.close();
+    assertEquals(hashBlock1, block1);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org