You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 02:25:25 UTC

[09/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules into common/network-*

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
new file mode 100644
index 0000000..fe933ed
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
+ * of Executors. Each Executor must register its own configuration about where it stores its files
+ * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
+ * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
+ */
+public class ExternalShuffleBlockResolver {
+  private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+  /**
+   * This a common prefix to the key for each app registration we stick in leveldb, so they
+   * are easy to find, since leveldb lets you search based on prefix.
+   */
+  private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
+  private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
+
+  // Map containing all registered executors' metadata.
+  @VisibleForTesting
+  final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+
+  // Single-threaded Java executor used to perform expensive recursive directory deletion.
+  private final Executor directoryCleaner;
+
+  private final TransportConf conf;
+
+  @VisibleForTesting
+  final File registeredExecutorFile;
+  @VisibleForTesting
+  final DB db;
+
+  public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
+      throws IOException {
+    this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
+        // Add `spark` prefix because it will run in NM in Yarn mode.
+        NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
+  }
+
+  // Allows tests to have more control over when directories are cleaned up.
+  @VisibleForTesting
+  ExternalShuffleBlockResolver(
+      TransportConf conf,
+      File registeredExecutorFile,
+      Executor directoryCleaner) throws IOException {
+    this.conf = conf;
+    this.registeredExecutorFile = registeredExecutorFile;
+    if (registeredExecutorFile != null) {
+      Options options = new Options();
+      options.createIfMissing(false);
+      options.logger(new LevelDBLogger());
+      DB tmpDb;
+      try {
+        tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+      } catch (NativeDB.DBException e) {
+        if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+          logger.info("Creating state database at " + registeredExecutorFile);
+          options.createIfMissing(true);
+          try {
+            tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+          } catch (NativeDB.DBException dbExc) {
+            throw new IOException("Unable to create state store", dbExc);
+          }
+        } else {
+          // the leveldb file seems to be corrupt somehow.  Lets just blow it away and create a new
+          // one, so we can keep processing new apps
+          logger.error("error opening leveldb file {}.  Creating new file, will not be able to " +
+            "recover state for existing applications", registeredExecutorFile, e);
+          if (registeredExecutorFile.isDirectory()) {
+            for (File f : registeredExecutorFile.listFiles()) {
+              if (!f.delete()) {
+                logger.warn("error deleting {}", f.getPath());
+              }
+            }
+          }
+          if (!registeredExecutorFile.delete()) {
+            logger.warn("error deleting {}", registeredExecutorFile.getPath());
+          }
+          options.createIfMissing(true);
+          try {
+            tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);
+          } catch (NativeDB.DBException dbExc) {
+            throw new IOException("Unable to create state store", dbExc);
+          }
+
+        }
+      }
+      // if there is a version mismatch, we throw an exception, which means the service is unusable
+      checkVersion(tmpDb);
+      executors = reloadRegisteredExecutors(tmpDb);
+      db = tmpDb;
+    } else {
+      db = null;
+      executors = Maps.newConcurrentMap();
+    }
+    this.directoryCleaner = directoryCleaner;
+  }
+
+  /** Registers a new Executor with all the configuration we need to find its shuffle files. */
+  public void registerExecutor(
+      String appId,
+      String execId,
+      ExecutorShuffleInfo executorInfo) {
+    AppExecId fullId = new AppExecId(appId, execId);
+    logger.info("Registered executor {} with {}", fullId, executorInfo);
+    try {
+      if (db != null) {
+        byte[] key = dbAppExecKey(fullId);
+        byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
+        db.put(key, value);
+      }
+    } catch (Exception e) {
+      logger.error("Error saving registered executors", e);
+    }
+    executors.put(fullId, executorInfo);
+  }
+
+  /**
+   * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
+   * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
+   * assumptions about how the hash and sort based shuffles store their data.
+   */
+  public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
+    String[] blockIdParts = blockId.split("_");
+    if (blockIdParts.length < 4) {
+      throw new IllegalArgumentException("Unexpected block id format: " + blockId);
+    } else if (!blockIdParts[0].equals("shuffle")) {
+      throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
+    }
+    int shuffleId = Integer.parseInt(blockIdParts[1]);
+    int mapId = Integer.parseInt(blockIdParts[2]);
+    int reduceId = Integer.parseInt(blockIdParts[3]);
+
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+    if (executor == null) {
+      throw new RuntimeException(
+        String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
+    }
+
+    if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
+      return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
+    } else if ("hash".equals(executor.shuffleManager)) {
+      return getHashBasedShuffleBlockData(executor, blockId);
+    } else {
+      throw new UnsupportedOperationException(
+        "Unsupported shuffle manager: " + executor.shuffleManager);
+    }
+  }
+
+  /**
+   * Removes our metadata of all executors registered for the given application, and optionally
+   * also deletes the local directories associated with the executors of that application in a
+   * separate thread.
+   *
+   * It is not valid to call registerExecutor() for an executor with this appId after invoking
+   * this method.
+   */
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+    Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+      AppExecId fullId = entry.getKey();
+      final ExecutorShuffleInfo executor = entry.getValue();
+
+      // Only touch executors associated with the appId that was removed.
+      if (appId.equals(fullId.appId)) {
+        it.remove();
+        if (db != null) {
+          try {
+            db.delete(dbAppExecKey(fullId));
+          } catch (IOException e) {
+            logger.error("Error deleting {} from executor state db", appId, e);
+          }
+        }
+
+        if (cleanupLocalDirs) {
+          logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+          // Execute the actual deletion in a different thread, as it may take some time.
+          directoryCleaner.execute(new Runnable() {
+            @Override
+            public void run() {
+              deleteExecutorDirs(executor.localDirs);
+            }
+          });
+        }
+      }
+    }
+  }
+
+  /**
+   * Synchronously deletes each directory one at a time.
+   * Should be executed in its own thread, as this may take a long time.
+   */
+  private void deleteExecutorDirs(String[] dirs) {
+    for (String localDir : dirs) {
+      try {
+        JavaUtils.deleteRecursively(new File(localDir));
+        logger.debug("Successfully cleaned up directory: " + localDir);
+      } catch (Exception e) {
+        logger.error("Failed to delete directory: " + localDir, e);
+      }
+    }
+  }
+
+  /**
+   * Hash-based shuffle data is simply stored as one file per block.
+   * This logic is from FileShuffleBlockResolver.
+   */
+  private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
+    File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
+    return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
+  }
+
+  /**
+   * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
+   * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
+   * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
+   */
+  private ManagedBuffer getSortBasedShuffleBlockData(
+    ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
+    File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
+      "shuffle_" + shuffleId + "_" + mapId + "_0.index");
+
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new FileInputStream(indexFile));
+      in.skipBytes(reduceId * 8);
+      long offset = in.readLong();
+      long nextOffset = in.readLong();
+      return new FileSegmentManagedBuffer(
+        conf,
+        getFile(executor.localDirs, executor.subDirsPerLocalDir,
+          "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
+        offset,
+        nextOffset - offset);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to open file: " + indexFile, e);
+    } finally {
+      if (in != null) {
+        JavaUtils.closeQuietly(in);
+      }
+    }
+  }
+
+  /**
+   * Hashes a filename into the corresponding local directory, in a manner consistent with
+   * Spark's DiskBlockManager.getFile().
+   */
+  @VisibleForTesting
+  static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
+    int hash = JavaUtils.nonNegativeHash(filename);
+    String localDir = localDirs[hash % localDirs.length];
+    int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
+    return new File(new File(localDir, String.format("%02x", subDirId)), filename);
+  }
+
+  void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered executors", e);
+      }
+    }
+  }
+
+  /** Simply encodes an executor's full ID, which is appId + execId. */
+  public static class AppExecId {
+    public final String appId;
+    public final String execId;
+
+    @JsonCreator
+    public AppExecId(@JsonProperty("appId") String appId, @JsonProperty("execId") String execId) {
+      this.appId = appId;
+      this.execId = execId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppExecId appExecId = (AppExecId) o;
+      return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(appId, execId);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("execId", execId)
+        .toString();
+    }
+  }
+
+  private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appExecJson = mapper.writeValueAsString(appExecId);
+    String key = (APP_KEY_PREFIX + ";" + appExecJson);
+    return key.getBytes(Charsets.UTF_8);
+  }
+
+  private static AppExecId parseDbAppExecKey(String s) throws IOException {
+    if (!s.startsWith(APP_KEY_PREFIX)) {
+      throw new IllegalArgumentException("expected a string starting with " + APP_KEY_PREFIX);
+    }
+    String json = s.substring(APP_KEY_PREFIX.length() + 1);
+    AppExecId parsed = mapper.readValue(json, AppExecId.class);
+    return parsed;
+  }
+
+  @VisibleForTesting
+  static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db)
+      throws IOException {
+    ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), Charsets.UTF_8);
+        if (!key.startsWith(APP_KEY_PREFIX)) {
+          break;
+        }
+        AppExecId id = parseDbAppExecKey(key);
+        ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
+        registeredExecutors.put(id, shuffleInfo);
+      }
+    }
+    return registeredExecutors;
+  }
+
+  private static class LevelDBLogger implements org.iq80.leveldb.Logger {
+    private static final Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);
+
+    @Override
+    public void log(String message) {
+      LOG.info(message);
+    }
+  }
+
+  /**
+   * Simple major.minor versioning scheme.  Any incompatible changes should be across major
+   * versions.  Minor version differences are allowed -- meaning we should be able to read
+   * dbs that are either earlier *or* later on the minor version.
+   */
+  private static void checkVersion(DB db) throws IOException {
+    byte[] bytes = db.get(StoreVersion.KEY);
+    if (bytes == null) {
+      storeVersion(db);
+    } else {
+      StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
+      if (version.major != CURRENT_VERSION.major) {
+        throw new IOException("cannot read state DB with version " + version + ", incompatible " +
+          "with current version " + CURRENT_VERSION);
+      }
+      storeVersion(db);
+    }
+  }
+
+  private static void storeVersion(DB db) throws IOException {
+    db.put(StoreVersion.KEY, mapper.writeValueAsBytes(CURRENT_VERSION));
+  }
+
+
+  public static class StoreVersion {
+
+    static final byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
+
+    public final int major;
+    public final int minor;
+
+    @JsonCreator public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
+      this.major = major;
+      this.minor = minor;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      StoreVersion that = (StoreVersion) o;
+
+      return major == that.major && minor == that.minor;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = major;
+      result = 31 * result + minor;
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
new file mode 100644
index 0000000..58ca87d
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.sasl.SaslClientBootstrap;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.server.NoOpRpcHandler;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Client for reading shuffle blocks which points to an external (outside of executor) server.
+ * This is instead of reading shuffle blocks directly from other executors (via
+ * BlockTransferService), which has the downside of losing the shuffle data if we lose the
+ * executors.
+ */
+public class ExternalShuffleClient extends ShuffleClient {
+  private final Logger logger = LoggerFactory.getLogger(ExternalShuffleClient.class);
+
+  private final TransportConf conf;
+  private final boolean saslEnabled;
+  private final boolean saslEncryptionEnabled;
+  private final SecretKeyHolder secretKeyHolder;
+
+  protected TransportClientFactory clientFactory;
+  protected String appId;
+
+  /**
+   * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
+   * then secretKeyHolder may be null.
+   */
+  public ExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled,
+      boolean saslEncryptionEnabled) {
+    Preconditions.checkArgument(
+      !saslEncryptionEnabled || saslEnabled,
+      "SASL encryption can only be enabled if SASL is also enabled.");
+    this.conf = conf;
+    this.secretKeyHolder = secretKeyHolder;
+    this.saslEnabled = saslEnabled;
+    this.saslEncryptionEnabled = saslEncryptionEnabled;
+  }
+
+  protected void checkInit() {
+    assert appId != null : "Called before init()";
+  }
+
+  @Override
+  public void init(String appId) {
+    this.appId = appId;
+    TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
+    List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
+    if (saslEnabled) {
+      bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, saslEncryptionEnabled));
+    }
+    clientFactory = context.createClientFactory(bootstraps);
+  }
+
+  @Override
+  public void fetchBlocks(
+      final String host,
+      final int port,
+      final String execId,
+      String[] blockIds,
+      BlockFetchingListener listener) {
+    checkInit();
+    logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
+    try {
+      RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
+        new RetryingBlockFetcher.BlockFetchStarter() {
+          @Override
+          public void createAndStart(String[] blockIds, BlockFetchingListener listener)
+              throws IOException {
+            TransportClient client = clientFactory.createClient(host, port);
+            new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
+          }
+        };
+
+      int maxRetries = conf.maxIORetries();
+      if (maxRetries > 0) {
+        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
+        // a bug in this code. We should remove the if statement once we're sure of the stability.
+        new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, listener).start();
+      } else {
+        blockFetchStarter.createAndStart(blockIds, listener);
+      }
+    } catch (Exception e) {
+      logger.error("Exception while beginning fetchBlocks", e);
+      for (String blockId : blockIds) {
+        listener.onBlockFetchFailure(blockId, e);
+      }
+    }
+  }
+
+  /**
+   * Registers this executor with an external shuffle server. This registration is required to
+   * inform the shuffle server about where and how we store our shuffle files.
+   *
+   * @param host Host of shuffle server.
+   * @param port Port of shuffle server.
+   * @param execId This Executor's id.
+   * @param executorInfo Contains all info necessary for the service to find our shuffle files.
+   */
+  public void registerWithShuffleServer(
+      String host,
+      int port,
+      String execId,
+      ExecutorShuffleInfo executorInfo) throws IOException {
+    checkInit();
+    TransportClient client = clientFactory.createUnmanagedClient(host, port);
+    try {
+      ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
+      client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+    } finally {
+      client.close();
+    }
+  }
+
+  @Override
+  public void close() {
+    clientFactory.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
new file mode 100644
index 0000000..1b2ddbf
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.ChunkReceivedCallback;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.OpenBlocks;
+import org.apache.spark.network.shuffle.protocol.StreamHandle;
+
+/**
+ * Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and
+ * invokes the BlockFetchingListener appropriately. This class is agnostic to the actual RPC
+ * handler, as long as there is a single "open blocks" message which returns a ShuffleStreamHandle,
+ * and Java serialization is used.
+ *
+ * Note that this typically corresponds to a
+ * {@link org.apache.spark.network.server.OneForOneStreamManager} on the server side.
+ */
+public class OneForOneBlockFetcher {
+  private final Logger logger = LoggerFactory.getLogger(OneForOneBlockFetcher.class);
+
+  private final TransportClient client;
+  private final OpenBlocks openMessage;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final ChunkReceivedCallback chunkCallback;
+
+  private StreamHandle streamHandle = null;
+
+  public OneForOneBlockFetcher(
+      TransportClient client,
+      String appId,
+      String execId,
+      String[] blockIds,
+      BlockFetchingListener listener) {
+    this.client = client;
+    this.openMessage = new OpenBlocks(appId, execId, blockIds);
+    this.blockIds = blockIds;
+    this.listener = listener;
+    this.chunkCallback = new ChunkCallback();
+  }
+
+  /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
+  private class ChunkCallback implements ChunkReceivedCallback {
+    @Override
+    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+      // On receipt of a chunk, pass it upwards as a block.
+      listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
+    }
+
+    @Override
+    public void onFailure(int chunkIndex, Throwable e) {
+      // On receipt of a failure, fail every block from chunkIndex onwards.
+      String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
+      failRemainingBlocks(remainingBlockIds, e);
+    }
+  }
+
+  /**
+   * Begins the fetching process, calling the listener with every block fetched.
+   * The given message will be serialized with the Java serializer, and the RPC must return a
+   * {@link StreamHandle}. We will send all fetch requests immediately, without throttling.
+   */
+  public void start() {
+    if (blockIds.length == 0) {
+      throw new IllegalArgumentException("Zero-sized blockIds array");
+    }
+
+    client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
+      @Override
+      public void onSuccess(ByteBuffer response) {
+        try {
+          streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
+          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);
+
+          // Immediately request all chunks -- we expect that the total size of the request is
+          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
+          for (int i = 0; i < streamHandle.numChunks; i++) {
+            client.fetchChunk(streamHandle.streamId, i, chunkCallback);
+          }
+        } catch (Exception e) {
+          logger.error("Failed while starting block fetches after success", e);
+          failRemainingBlocks(blockIds, e);
+        }
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        logger.error("Failed while starting block fetches", e);
+        failRemainingBlocks(blockIds, e);
+      }
+    });
+  }
+
+  /** Invokes the "onBlockFetchFailure" callback for every listed block id. */
+  private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
+    for (String blockId : failedBlockIds) {
+      try {
+        listener.onBlockFetchFailure(blockId, e);
+      } catch (Exception e2) {
+        logger.error("Error in block fetch failure callback", e2);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
new file mode 100644
index 0000000..4bb0498
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Wraps another BlockFetcher with the ability to automatically retry fetches which fail due to
+ * IOExceptions, which we hope are due to transient network conditions.
+ *
+ * This fetcher provides stronger guarantees regarding the parent BlockFetchingListener. In
+ * particular, the listener will be invoked exactly once per blockId, with a success or failure.
+ */
+public class RetryingBlockFetcher {
+
+  /**
+   * Used to initiate the first fetch for all blocks, and subsequently for retrying the fetch on any
+   * remaining blocks.
+   */
+  public static interface BlockFetchStarter {
+    /**
+     * Creates a new BlockFetcher to fetch the given block ids which may do some synchronous
+     * bootstrapping followed by fully asynchronous block fetching.
+     * The BlockFetcher must eventually invoke the Listener on every input blockId, or else this
+     * method must throw an exception.
+     *
+     * This method should always attempt to get a new TransportClient from the
+     * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
+     * issues.
+     */
+    void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
+  }
+
+  /** Shared executor service used for waiting and retrying. */
+  private static final ExecutorService executorService = Executors.newCachedThreadPool(
+    NettyUtils.createThreadFactory("Block Fetch Retry"));
+
+  private final Logger logger = LoggerFactory.getLogger(RetryingBlockFetcher.class);
+
+  /** Used to initiate new Block Fetches on our remaining blocks. */
+  private final BlockFetchStarter fetchStarter;
+
+  /** Parent listener which we delegate all successful or permanently failed block fetches to. */
+  private final BlockFetchingListener listener;
+
+  /** Max number of times we are allowed to retry. */
+  private final int maxRetries;
+
+  /** Milliseconds to wait before each retry. */
+  private final int retryWaitTime;
+
+  // NOTE:
+  // All of our non-final fields are synchronized under 'this' and should only be accessed/mutated
+  // while inside a synchronized block.
+  /** Number of times we've attempted to retry so far. */
+  private int retryCount = 0;
+
+  /**
+   * Set of all block ids which have not been fetched successfully or with a non-IO Exception.
+   * A retry involves requesting every outstanding block. Note that since this is a LinkedHashSet,
+   * input ordering is preserved, so we always request blocks in the same order the user provided.
+   */
+  private final LinkedHashSet<String> outstandingBlocksIds;
+
+  /**
+   * The BlockFetchingListener that is active with our current BlockFetcher.
+   * When we start a retry, we immediately replace this with a new Listener, which causes all any
+   * old Listeners to ignore all further responses.
+   */
+  private RetryingBlockFetchListener currentListener;
+
+  public RetryingBlockFetcher(
+      TransportConf conf,
+      BlockFetchStarter fetchStarter,
+      String[] blockIds,
+      BlockFetchingListener listener) {
+    this.fetchStarter = fetchStarter;
+    this.listener = listener;
+    this.maxRetries = conf.maxIORetries();
+    this.retryWaitTime = conf.ioRetryWaitTimeMs();
+    this.outstandingBlocksIds = Sets.newLinkedHashSet();
+    Collections.addAll(outstandingBlocksIds, blockIds);
+    this.currentListener = new RetryingBlockFetchListener();
+  }
+
+  /**
+   * Initiates the fetch of all blocks provided in the constructor, with possible retries in the
+   * event of transient IOExceptions.
+   */
+  public void start() {
+    fetchAllOutstanding();
+  }
+
+  /**
+   * Fires off a request to fetch all blocks that have not been fetched successfully or permanently
+   * failed (i.e., by a non-IOException).
+   */
+  private void fetchAllOutstanding() {
+    // Start by retrieving our shared state within a synchronized block.
+    String[] blockIdsToFetch;
+    int numRetries;
+    RetryingBlockFetchListener myListener;
+    synchronized (this) {
+      blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
+      numRetries = retryCount;
+      myListener = currentListener;
+    }
+
+    // Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails.
+    try {
+      fetchStarter.createAndStart(blockIdsToFetch, myListener);
+    } catch (Exception e) {
+      logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
+        blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
+
+      if (shouldRetry(e)) {
+        initiateRetry();
+      } else {
+        for (String bid : blockIdsToFetch) {
+          listener.onBlockFetchFailure(bid, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Lightweight method which initiates a retry in a different thread. The retry will involve
+   * calling fetchAllOutstanding() after a configured wait time.
+   */
+  private synchronized void initiateRetry() {
+    retryCount += 1;
+    currentListener = new RetryingBlockFetchListener();
+
+    logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
+      retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
+
+    executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
+        fetchAllOutstanding();
+      }
+    });
+  }
+
+  /**
+   * Returns true if we should retry due a block fetch failure. We will retry if and only if
+   * the exception was an IOException and we haven't retried 'maxRetries' times already.
+   */
+  private synchronized boolean shouldRetry(Throwable e) {
+    boolean isIOException = e instanceof IOException
+      || (e.getCause() != null && e.getCause() instanceof IOException);
+    boolean hasRemainingRetries = retryCount < maxRetries;
+    return isIOException && hasRemainingRetries;
+  }
+
+  /**
+   * Our RetryListener intercepts block fetch responses and forwards them to our parent listener.
+   * Note that in the event of a retry, we will immediately replace the 'currentListener' field,
+   * indicating that any responses from non-current Listeners should be ignored.
+   */
+  private class RetryingBlockFetchListener implements BlockFetchingListener {
+    @Override
+    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+      // We will only forward this success message to our parent listener if this block request is
+      // outstanding and we are still the active listener.
+      boolean shouldForwardSuccess = false;
+      synchronized (RetryingBlockFetcher.this) {
+        if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
+          outstandingBlocksIds.remove(blockId);
+          shouldForwardSuccess = true;
+        }
+      }
+
+      // Now actually invoke the parent listener, outside of the synchronized block.
+      if (shouldForwardSuccess) {
+        listener.onBlockFetchSuccess(blockId, data);
+      }
+    }
+
+    @Override
+    public void onBlockFetchFailure(String blockId, Throwable exception) {
+      // We will only forward this failure to our parent listener if this block request is
+      // outstanding, we are still the active listener, AND we cannot retry the fetch.
+      boolean shouldForwardFailure = false;
+      synchronized (RetryingBlockFetcher.this) {
+        if (this == currentListener && outstandingBlocksIds.contains(blockId)) {
+          if (shouldRetry(exception)) {
+            initiateRetry();
+          } else {
+            logger.error(String.format("Failed to fetch block %s, and will not retry (%s retries)",
+              blockId, retryCount), exception);
+            outstandingBlocksIds.remove(blockId);
+            shouldForwardFailure = true;
+          }
+        }
+      }
+
+      // Now actually invoke the parent listener, outside of the synchronized block.
+      if (shouldForwardFailure) {
+        listener.onBlockFetchFailure(blockId, exception);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
new file mode 100644
index 0000000..f72ab40
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.Closeable;
+
+/** Provides an interface for reading shuffle files, either from an Executor or external service. */
+public abstract class ShuffleClient implements Closeable {
+
+  /**
+   * Initializes the ShuffleClient, specifying this Executor's appId.
+   * Must be called before any other method on the ShuffleClient.
+   */
+  public void init(String appId) { }
+
+  /**
+   * Fetch a sequence of blocks from a remote node asynchronously,
+   *
+   * Note that this API takes a sequence so the implementation can batch requests, and does not
+   * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
+   * the data of a block is fetched, rather than waiting for all blocks to be fetched.
+   */
+  public abstract void fetchBlocks(
+      String host,
+      int port,
+      String execId,
+      String[] blockIds,
+      BlockFetchingListener listener);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
new file mode 100644
index 0000000..6758203
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.mesos;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A client for talking to the external shuffle service in Mesos coarse-grained mode.
+ *
+ * This is used by the Spark driver to register with each external shuffle service on the cluster.
+ * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
+ * after the application exits. Mesos does not provide a great alternative to do this, so Spark
+ * has to detect this itself.
+ */
+public class MesosExternalShuffleClient extends ExternalShuffleClient {
+  private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
+
+  /**
+   * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
+   * Please refer to docs on {@link ExternalShuffleClient} for more information.
+   */
+  public MesosExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled,
+      boolean saslEncryptionEnabled) {
+    super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
+  }
+
+  public void registerDriverWithShuffleService(String host, int port) throws IOException {
+    checkInit();
+    ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
+    TransportClient client = clientFactory.createClient(host, port);
+    client.sendRpc(registerDriver, new RpcResponseCallback() {
+      @Override
+      public void onSuccess(ByteBuffer response) {
+        logger.info("Successfully registered app " + appId + " with external shuffle service.");
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        logger.warn("Unable to register app " + appId + " with external shuffle service. " +
+          "Please manually remove shuffle data after driver exit. Error: " + e);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
new file mode 100644
index 0000000..7fbe338
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import org.apache.spark.network.protocol.Encodable;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+
+/**
+ * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
+ * by Spark's NettyBlockTransferService.
+ *
+ * At a high level:
+ *   - OpenBlock is handled by both services, but only services shuffle files for the external
+ *     shuffle service. It returns a StreamHandle.
+ *   - UploadBlock is only handled by the NettyBlockTransferService.
+ *   - RegisterExecutor is only handled by the external shuffle service.
+ */
+public abstract class BlockTransferMessage implements Encodable {
+  protected abstract Type type();
+
+  /** Preceding every serialized message is its type, which allows us to deserialize it. */
+  public static enum Type {
+    OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
+
+    private final byte id;
+
+    private Type(int id) {
+      assert id < 128 : "Cannot have more than 128 message types";
+      this.id = (byte) id;
+    }
+
+    public byte id() { return id; }
+  }
+
+  // NB: Java does not support static methods in interfaces, so we must put this in a static class.
+  public static class Decoder {
+    /** Deserializes the 'type' byte followed by the message itself. */
+    public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
+      ByteBuf buf = Unpooled.wrappedBuffer(msg);
+      byte type = buf.readByte();
+      switch (type) {
+        case 0: return OpenBlocks.decode(buf);
+        case 1: return UploadBlock.decode(buf);
+        case 2: return RegisterExecutor.decode(buf);
+        case 3: return StreamHandle.decode(buf);
+        case 4: return RegisterDriver.decode(buf);
+        default: throw new IllegalArgumentException("Unknown message type: " + type);
+      }
+    }
+  }
+
+  /** Serializes the 'type' byte followed by the message itself. */
+  public ByteBuffer toByteBuffer() {
+    // Allow room for encoded message, plus the type byte
+    ByteBuf buf = Unpooled.buffer(encodedLength() + 1);
+    buf.writeByte(type().id);
+    encode(buf);
+    assert buf.writableBytes() == 0 : "Writable bytes remain: " + buf.writableBytes();
+    return buf.nioBuffer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
new file mode 100644
index 0000000..102d4ef
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encodable;
+import org.apache.spark.network.protocol.Encoders;
+
+/** Contains all configuration necessary for locating the shuffle files of an executor. */
+public class ExecutorShuffleInfo implements Encodable {
+  /** The base set of local directories that the executor stores its shuffle files in. */
+  public final String[] localDirs;
+  /** Number of subdirectories created within each localDir. */
+  public final int subDirsPerLocalDir;
+  /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
+  public final String shuffleManager;
+
+  @JsonCreator
+  public ExecutorShuffleInfo(
+      @JsonProperty("localDirs") String[] localDirs,
+      @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir,
+      @JsonProperty("shuffleManager") String shuffleManager) {
+    this.localDirs = localDirs;
+    this.subDirsPerLocalDir = subDirsPerLocalDir;
+    this.shuffleManager = shuffleManager;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(subDirsPerLocalDir, shuffleManager) * 41 + Arrays.hashCode(localDirs);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("localDirs", Arrays.toString(localDirs))
+      .add("subDirsPerLocalDir", subDirsPerLocalDir)
+      .add("shuffleManager", shuffleManager)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof ExecutorShuffleInfo) {
+      ExecutorShuffleInfo o = (ExecutorShuffleInfo) other;
+      return Arrays.equals(localDirs, o.localDirs)
+        && Objects.equal(subDirsPerLocalDir, o.subDirsPerLocalDir)
+        && Objects.equal(shuffleManager, o.shuffleManager);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.StringArrays.encodedLength(localDirs)
+        + 4 // int
+        + Encoders.Strings.encodedLength(shuffleManager);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.StringArrays.encode(buf, localDirs);
+    buf.writeInt(subDirsPerLocalDir);
+    Encoders.Strings.encode(buf, shuffleManager);
+  }
+
+  public static ExecutorShuffleInfo decode(ByteBuf buf) {
+    String[] localDirs = Encoders.StringArrays.decode(buf);
+    int subDirsPerLocalDir = buf.readInt();
+    String shuffleManager = Encoders.Strings.decode(buf);
+    return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
new file mode 100644
index 0000000..ce954b8
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/** Request to read a set of blocks. Returns {@link StreamHandle}. */
+public class OpenBlocks extends BlockTransferMessage {
+  public final String appId;
+  public final String execId;
+  public final String[] blockIds;
+
+  public OpenBlocks(String appId, String execId, String[] blockIds) {
+    this.appId = appId;
+    this.execId = execId;
+    this.blockIds = blockIds;
+  }
+
+  @Override
+  protected Type type() { return Type.OPEN_BLOCKS; }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId, execId) * 41 + Arrays.hashCode(blockIds);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("appId", appId)
+      .add("execId", execId)
+      .add("blockIds", Arrays.toString(blockIds))
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof OpenBlocks) {
+      OpenBlocks o = (OpenBlocks) other;
+      return Objects.equal(appId, o.appId)
+        && Objects.equal(execId, o.execId)
+        && Arrays.equals(blockIds, o.blockIds);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId)
+      + Encoders.Strings.encodedLength(execId)
+      + Encoders.StringArrays.encodedLength(blockIds);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+    Encoders.Strings.encode(buf, execId);
+    Encoders.StringArrays.encode(buf, blockIds);
+  }
+
+  public static OpenBlocks decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    String execId = Encoders.Strings.decode(buf);
+    String[] blockIds = Encoders.StringArrays.decode(buf);
+    return new OpenBlocks(appId, execId, blockIds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
new file mode 100644
index 0000000..167ef33
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/**
+ * Initial registration message between an executor and its local shuffle server.
+ * Returns nothing (empty byte array).
+ */
+public class RegisterExecutor extends BlockTransferMessage {
+  public final String appId;
+  public final String execId;
+  public final ExecutorShuffleInfo executorInfo;
+
+  public RegisterExecutor(
+      String appId,
+      String execId,
+      ExecutorShuffleInfo executorInfo) {
+    this.appId = appId;
+    this.execId = execId;
+    this.executorInfo = executorInfo;
+  }
+
+  @Override
+  protected Type type() { return Type.REGISTER_EXECUTOR; }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId, execId, executorInfo);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("appId", appId)
+      .add("execId", execId)
+      .add("executorInfo", executorInfo)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof RegisterExecutor) {
+      RegisterExecutor o = (RegisterExecutor) other;
+      return Objects.equal(appId, o.appId)
+        && Objects.equal(execId, o.execId)
+        && Objects.equal(executorInfo, o.executorInfo);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId)
+      + Encoders.Strings.encodedLength(execId)
+      + executorInfo.encodedLength();
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+    Encoders.Strings.encode(buf, execId);
+    executorInfo.encode(buf);
+  }
+
+  public static RegisterExecutor decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    String execId = Encoders.Strings.decode(buf);
+    ExecutorShuffleInfo executorShuffleInfo = ExecutorShuffleInfo.decode(buf);
+    return new RegisterExecutor(appId, execId, executorShuffleInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
new file mode 100644
index 0000000..1915295
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/**
+ * Identifier for a fixed number of chunks to read from a stream created by an "open blocks"
+ * message. This is used by {@link org.apache.spark.network.shuffle.OneForOneBlockFetcher}.
+ */
+public class StreamHandle extends BlockTransferMessage {
+  public final long streamId;
+  public final int numChunks;
+
+  public StreamHandle(long streamId, int numChunks) {
+    this.streamId = streamId;
+    this.numChunks = numChunks;
+  }
+
+  @Override
+  protected Type type() { return Type.STREAM_HANDLE; }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(streamId, numChunks);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("streamId", streamId)
+      .add("numChunks", numChunks)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof StreamHandle) {
+      StreamHandle o = (StreamHandle) other;
+      return Objects.equal(streamId, o.streamId)
+        && Objects.equal(numChunks, o.numChunks);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 8 + 4;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeLong(streamId);
+    buf.writeInt(numChunks);
+  }
+
+  public static StreamHandle decode(ByteBuf buf) {
+    long streamId = buf.readLong();
+    int numChunks = buf.readInt();
+    return new StreamHandle(streamId, numChunks);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
new file mode 100644
index 0000000..3caed59
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
+public class UploadBlock extends BlockTransferMessage {
+  public final String appId;
+  public final String execId;
+  public final String blockId;
+  // TODO: StorageLevel is serialized separately in here because StorageLevel is not available in
+  // this package. We should avoid this hack.
+  public final byte[] metadata;
+  public final byte[] blockData;
+
+  /**
+   * @param metadata Meta-information about block, typically StorageLevel.
+   * @param blockData The actual block's bytes.
+   */
+  public UploadBlock(
+      String appId,
+      String execId,
+      String blockId,
+      byte[] metadata,
+      byte[] blockData) {
+    this.appId = appId;
+    this.execId = execId;
+    this.blockId = blockId;
+    this.metadata = metadata;
+    this.blockData = blockData;
+  }
+
+  @Override
+  protected Type type() { return Type.UPLOAD_BLOCK; }
+
+  @Override
+  public int hashCode() {
+    int objectsHashCode = Objects.hashCode(appId, execId, blockId);
+    return (objectsHashCode * 41 + Arrays.hashCode(metadata)) * 41 + Arrays.hashCode(blockData);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("appId", appId)
+      .add("execId", execId)
+      .add("blockId", blockId)
+      .add("metadata size", metadata.length)
+      .add("block size", blockData.length)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof UploadBlock) {
+      UploadBlock o = (UploadBlock) other;
+      return Objects.equal(appId, o.appId)
+        && Objects.equal(execId, o.execId)
+        && Objects.equal(blockId, o.blockId)
+        && Arrays.equals(metadata, o.metadata)
+        && Arrays.equals(blockData, o.blockData);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId)
+      + Encoders.Strings.encodedLength(execId)
+      + Encoders.Strings.encodedLength(blockId)
+      + Encoders.ByteArrays.encodedLength(metadata)
+      + Encoders.ByteArrays.encodedLength(blockData);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+    Encoders.Strings.encode(buf, execId);
+    Encoders.Strings.encode(buf, blockId);
+    Encoders.ByteArrays.encode(buf, metadata);
+    Encoders.ByteArrays.encode(buf, blockData);
+  }
+
+  public static UploadBlock decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    String execId = Encoders.Strings.decode(buf);
+    String blockId = Encoders.Strings.decode(buf);
+    byte[] metadata = Encoders.ByteArrays.decode(buf);
+    byte[] blockData = Encoders.ByteArrays.decode(buf);
+    return new UploadBlock(appId, execId, blockId, metadata, blockData);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
new file mode 100644
index 0000000..94a61d6
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol.mesos;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/**
+ * A message sent from the driver to register with the MesosExternalShuffleService.
+ */
+public class RegisterDriver extends BlockTransferMessage {
+  private final String appId;
+
+  public RegisterDriver(String appId) {
+    this.appId = appId;
+  }
+
+  public String getAppId() { return appId; }
+
+  @Override
+  protected Type type() { return Type.REGISTER_DRIVER; }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId);
+  }
+
+  public static RegisterDriver decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    return new RegisterDriver(appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
new file mode 100644
index 0000000..0ea631e
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.ChunkReceivedCallback;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.client.TransportClientBootstrap;
+import org.apache.spark.network.client.TransportClientFactory;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.shuffle.BlockFetchingListener;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
+import org.apache.spark.network.shuffle.OneForOneBlockFetcher;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.shuffle.protocol.OpenBlocks;
+import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
+import org.apache.spark.network.shuffle.protocol.StreamHandle;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class SaslIntegrationSuite {
+
+  // Use a long timeout to account for slow / overloaded build machines. In the normal case,
+  // tests should finish way before the timeout expires.
+  private static final long TIMEOUT_MS = 10_000;
+
+  static TransportServer server;
+  static TransportConf conf;
+  static TransportContext context;
+  static SecretKeyHolder secretKeyHolder;
+
+  TransportClientFactory clientFactory;
+
+  @BeforeClass
+  public static void beforeAll() throws IOException {
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+    context = new TransportContext(conf, new TestRpcHandler());
+
+    secretKeyHolder = mock(SecretKeyHolder.class);
+    when(secretKeyHolder.getSaslUser(eq("app-1"))).thenReturn("app-1");
+    when(secretKeyHolder.getSecretKey(eq("app-1"))).thenReturn("app-1");
+    when(secretKeyHolder.getSaslUser(eq("app-2"))).thenReturn("app-2");
+    when(secretKeyHolder.getSecretKey(eq("app-2"))).thenReturn("app-2");
+    when(secretKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
+    when(secretKeyHolder.getSecretKey(anyString())).thenReturn("correct-password");
+
+    TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
+    server = context.createServer(Arrays.asList(bootstrap));
+  }
+
+
+  @AfterClass
+  public static void afterAll() {
+    server.close();
+  }
+
+  @After
+  public void afterEach() {
+    if (clientFactory != null) {
+      clientFactory.close();
+      clientFactory = null;
+    }
+  }
+
+  @Test
+  public void testGoodClient() throws IOException {
+    clientFactory = context.createClientFactory(
+      Lists.<TransportClientBootstrap>newArrayList(
+        new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+
+    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+    String msg = "Hello, World!";
+    ByteBuffer resp = client.sendRpcSync(JavaUtils.stringToBytes(msg), TIMEOUT_MS);
+    assertEquals(msg, JavaUtils.bytesToString(resp));
+  }
+
+  @Test
+  public void testBadClient() {
+    SecretKeyHolder badKeyHolder = mock(SecretKeyHolder.class);
+    when(badKeyHolder.getSaslUser(anyString())).thenReturn("other-app");
+    when(badKeyHolder.getSecretKey(anyString())).thenReturn("wrong-password");
+    clientFactory = context.createClientFactory(
+      Lists.<TransportClientBootstrap>newArrayList(
+        new SaslClientBootstrap(conf, "unknown-app", badKeyHolder)));
+
+    try {
+      // Bootstrap should fail on startup.
+      clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+      fail("Connection should have failed.");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
+    }
+  }
+
+  @Test
+  public void testNoSaslClient() throws IOException {
+    clientFactory = context.createClientFactory(
+      Lists.<TransportClientBootstrap>newArrayList());
+
+    TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+    try {
+      client.sendRpcSync(ByteBuffer.allocate(13), TIMEOUT_MS);
+      fail("Should have failed");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Expected SaslMessage"));
+    }
+
+    try {
+      // Guessing the right tag byte doesn't magically get you in...
+      client.sendRpcSync(ByteBuffer.wrap(new byte[] { (byte) 0xEA }), TIMEOUT_MS);
+      fail("Should have failed");
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("java.lang.IndexOutOfBoundsException"));
+    }
+  }
+
+  @Test
+  public void testNoSaslServer() {
+    RpcHandler handler = new TestRpcHandler();
+    TransportContext context = new TransportContext(conf, handler);
+    clientFactory = context.createClientFactory(
+      Lists.<TransportClientBootstrap>newArrayList(
+        new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+    TransportServer server = context.createServer();
+    try {
+      clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Digest-challenge format violation"));
+    } finally {
+      server.close();
+    }
+  }
+
+  /**
+   * This test is not actually testing SASL behavior, but testing that the shuffle service
+   * performs correct authorization checks based on the SASL authentication data.
+   */
+  @Test
+  public void testAppIsolation() throws Exception {
+    // Start a new server with the correct RPC handler to serve block data.
+    ExternalShuffleBlockResolver blockResolver = mock(ExternalShuffleBlockResolver.class);
+    ExternalShuffleBlockHandler blockHandler = new ExternalShuffleBlockHandler(
+      new OneForOneStreamManager(), blockResolver);
+    TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf, secretKeyHolder);
+    TransportContext blockServerContext = new TransportContext(conf, blockHandler);
+    TransportServer blockServer = blockServerContext.createServer(Arrays.asList(bootstrap));
+
+    TransportClient client1 = null;
+    TransportClient client2 = null;
+    TransportClientFactory clientFactory2 = null;
+    try {
+      // Create a client, and make a request to fetch blocks from a different app.
+      clientFactory = blockServerContext.createClientFactory(
+        Lists.<TransportClientBootstrap>newArrayList(
+          new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
+      client1 = clientFactory.createClient(TestUtils.getLocalHost(),
+        blockServer.getPort());
+
+      final AtomicReference<Throwable> exception = new AtomicReference<>();
+
+      BlockFetchingListener listener = new BlockFetchingListener() {
+        @Override
+        public synchronized void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+          notifyAll();
+        }
+
+        @Override
+        public synchronized void onBlockFetchFailure(String blockId, Throwable t) {
+          exception.set(t);
+          notifyAll();
+        }
+      };
+
+      String[] blockIds = new String[] { "shuffle_2_3_4", "shuffle_6_7_8" };
+      OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0",
+        blockIds, listener);
+      synchronized (listener) {
+        fetcher.start();
+        listener.wait();
+      }
+      checkSecurityException(exception.get());
+
+      // Register an executor so that the next steps work.
+      ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
+        new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
+      RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
+      client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
+
+      // Make a successful request to fetch blocks, which creates a new stream. But do not actually
+      // fetch any blocks, to keep the stream open.
+      OpenBlocks openMessage = new OpenBlocks("app-1", "0", blockIds);
+      ByteBuffer response = client1.sendRpcSync(openMessage.toByteBuffer(), TIMEOUT_MS);
+      StreamHandle stream = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
+      long streamId = stream.streamId;
+
+      // Create a second client, authenticated with a different app ID, and try to read from
+      // the stream created for the previous app.
+      clientFactory2 = blockServerContext.createClientFactory(
+        Lists.<TransportClientBootstrap>newArrayList(
+          new SaslClientBootstrap(conf, "app-2", secretKeyHolder)));
+      client2 = clientFactory2.createClient(TestUtils.getLocalHost(),
+        blockServer.getPort());
+
+      ChunkReceivedCallback callback = new ChunkReceivedCallback() {
+        @Override
+        public synchronized void onSuccess(int chunkIndex, ManagedBuffer buffer) {
+          notifyAll();
+        }
+
+        @Override
+        public synchronized void onFailure(int chunkIndex, Throwable t) {
+          exception.set(t);
+          notifyAll();
+        }
+      };
+
+      exception.set(null);
+      synchronized (callback) {
+        client2.fetchChunk(streamId, 0, callback);
+        callback.wait();
+      }
+      checkSecurityException(exception.get());
+    } finally {
+      if (client1 != null) {
+        client1.close();
+      }
+      if (client2 != null) {
+        client2.close();
+      }
+      if (clientFactory2 != null) {
+        clientFactory2.close();
+      }
+      blockServer.close();
+    }
+  }
+
+  /** RPC handler which simply responds with the message it received. */
+  public static class TestRpcHandler extends RpcHandler {
+    @Override
+    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
+      callback.onSuccess(message);
+    }
+
+    @Override
+    public StreamManager getStreamManager() {
+      return new OneForOneStreamManager();
+    }
+  }
+
+  private void checkSecurityException(Throwable t) {
+    assertNotNull("No exception was caught.", t);
+    assertTrue("Expected SecurityException.",
+      t.getMessage().contains(SecurityException.class.getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
new file mode 100644
index 0000000..86c8609
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.spark.network.shuffle.protocol.*;
+
+/** Verifies that all BlockTransferMessages can be serialized correctly. */
+public class BlockTransferMessagesSuite {
+  @Test
+  public void serializeOpenShuffleBlocks() {
+    checkSerializeDeserialize(new OpenBlocks("app-1", "exec-2", new String[] { "b1", "b2" }));
+    checkSerializeDeserialize(new RegisterExecutor("app-1", "exec-2", new ExecutorShuffleInfo(
+      new String[] { "/local1", "/local2" }, 32, "MyShuffleManager")));
+    checkSerializeDeserialize(new UploadBlock("app-1", "exec-2", "block-3", new byte[] { 1, 2 },
+      new byte[] { 4, 5, 6, 7} ));
+    checkSerializeDeserialize(new StreamHandle(12345, 16));
+  }
+
+  private void checkSerializeDeserialize(BlockTransferMessage msg) {
+    BlockTransferMessage msg2 = BlockTransferMessage.Decoder.fromByteBuffer(msg.toByteBuffer());
+    assertEquals(msg, msg2);
+    assertEquals(msg.hashCode(), msg2.hashCode());
+    assertEquals(msg.toString(), msg2.toString());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org