You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/11/06 00:42:12 UTC

git commit: [SPARK-3797] Run external shuffle service in Yarn NM

Repository: spark
Updated Branches:
  refs/heads/master f37817b18 -> 61a5cced0


[SPARK-3797] Run external shuffle service in Yarn NM

This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark.

This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster.

Author: Andrew Or <an...@databricks.com>

Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits:

ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
0ee67a2 [Andrew Or] Minor wording suggestions
1c66046 [Andrew Or] Remove unused provided dependencies
0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
6489db5 [Andrew Or] Try catch at the right places
7b71d8f [Andrew Or] Add detailed java docs + reword a few comments
d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE)
5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
9b6e058 [Andrew Or] Address various feedback
f48b20c [Andrew Or] Fix tests again
f39daa6 [Andrew Or] Do not make network-yarn an assembly module
761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service
15a5b37 [Andrew Or] Fix build for Hadoop 1.x
baff916 [Andrew Or] Fix tests
5bf9b7e [Andrew Or] Address a few minor comments
5b419b8 [Andrew Or] Add missing license header
804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution
cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation
ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled
1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config
b4b1f0c [Andrew Or] 4 tabs -> 2 tabs
43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service
b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61a5cced
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61a5cced
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61a5cced

Branch: refs/heads/master
Commit: 61a5cced049a8056292ba94f23fa7bd040f50685
Parents: f37817b
Author: Andrew Or <an...@databricks.com>
Authored: Wed Nov 5 15:42:05 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Nov 5 15:42:05 2014 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  37 ++--
 .../org/apache/spark/storage/BlockManager.scala |   8 +-
 .../scala/org/apache/spark/util/Utils.scala     |  16 ++
 make-distribution.sh                            |   3 +
 .../network/sasl/ShuffleSecretManager.java      | 117 ++++++++++++
 network/yarn/pom.xml                            |  58 ++++++
 .../spark/network/yarn/YarnShuffleService.java  | 176 +++++++++++++++++++
 .../network/yarn/util/HadoopConfigProvider.java |  42 +++++
 pom.xml                                         |   2 +
 project/SparkBuild.scala                        |   8 +-
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  16 ++
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  16 ++
 12 files changed, 483 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c11f1db..ef93009 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
   // Lower and upper bounds on the number of executors. These are required.
   private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
   private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
-  verifyBounds()
 
   // How long there must be backlogged tasks for before an addition is triggered
   private val schedulerBacklogTimeout = conf.getLong(
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
     "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
 
   // How long an executor must be idle for before it is removed
-  private val removeThresholdSeconds = conf.getLong(
+  private val executorIdleTimeout = conf.getLong(
     "spark.dynamicAllocation.executorIdleTimeout", 600)
 
+  // During testing, the methods to actually kill and add executors are mocked out
+  private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
+
+  validateSettings()
+
   // Number of executors to add in the next round
   private var numExecutorsToAdd = 1
 
@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
   // Polling loop interval (ms)
   private val intervalMillis: Long = 100
 
-  // Whether we are testing this class. This should only be used internally.
-  private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
-
   // Clock used to schedule when executors should be added and removed
   private var clock: Clock = new RealClock
 
   /**
-   * Verify that the lower and upper bounds on the number of executors are valid.
+   * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
    */
-  private def verifyBounds(): Unit = {
+  private def validateSettings(): Unit = {
     if (minNumExecutors < 0 || maxNumExecutors < 0) {
       throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
     }
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
       throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
         s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
     }
+    if (schedulerBacklogTimeout <= 0) {
+      throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
+    }
+    if (sustainedSchedulerBacklogTimeout <= 0) {
+      throw new SparkException(
+        "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
+    }
+    if (executorIdleTimeout <= 0) {
+      throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
+    }
+    // Require external shuffle service for dynamic allocation
+    // Otherwise, we may lose shuffle files when killing executors
+    if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
+      throw new SparkException("Dynamic allocation of executors requires the external " +
+        "shuffle service. You may enable this through spark.shuffle.service.enabled.")
+    }
   }
 
   /**
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
     val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
     if (removeRequestAcknowledged) {
       logInfo(s"Removing executor $executorId because it has been idle for " +
-        s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
+        s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
       executorsPendingToRemove.add(executorId)
       true
     } else {
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
   private def onExecutorIdle(executorId: String): Unit = synchronized {
     if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
       logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
-        s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
-      removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
+        s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
+      removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a5fb87b..e48d777 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,7 +40,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.util._
 
 private[spark] sealed trait BlockValues
@@ -97,7 +96,12 @@ private[spark] class BlockManager(
 
   private[spark]
   val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
-  private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
+
+  // Port used by the external shuffle service. In Yarn mode, this may be already be
+  // set through the Hadoop configuration as the server is launched in the Yarn NM.
+  private val externalShuffleServicePort =
+    Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+
   // Check that we're not using external shuffle service with consolidated shuffle files.
   if (externalShuffleServiceEnabled
       && conf.getBoolean("spark.shuffle.consolidateFiles", false)

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6ab94af..7caf6bc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -45,6 +45,7 @@ import org.json4s._
 import tachyon.client.{TachyonFile,TachyonFS}
 
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
 /** CallSite represents a place in user code. It can have a short and a long form. */
@@ -1780,6 +1781,21 @@ private[spark] object Utils extends Logging {
       val manifest = new JarManifest(manifestUrl.openStream())
       manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
     }.getOrElse("Unknown")
+
+  /**
+   * Return the value of a config either through the SparkConf or the Hadoop configuration
+   * if this is Yarn mode. In the latter case, this defaults to the value set through SparkConf
+   * if the key is not set in the Hadoop configuration.
+   */
+  def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = {
+    val sparkValue = conf.get(key, default)
+    if (SparkHadoopUtil.get.isYarnMode) {
+      SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue)
+    } else {
+      sparkValue
+    }
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/make-distribution.sh
----------------------------------------------------------------------
diff --git a/make-distribution.sh b/make-distribution.sh
index 0bc839e..fac7f7e 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -181,6 +181,9 @@ echo "Spark $VERSION$GITREVSTRING built for Hadoop $SPARK_HADOOP_VERSION" > "$DI
 # Copy jars
 cp "$FWDIR"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
 cp "$FWDIR"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
+cp "$FWDIR"/network/yarn/target/scala*/spark-network-yarn*.jar "$DISTDIR/lib/"
+cp "$FWDIR"/network/yarn/target/scala*/spark-network-shuffle*.jar "$DISTDIR/lib/"
+cp "$FWDIR"/network/yarn/target/scala*/spark-network-common*.jar "$DISTDIR/lib/"
 
 # Copy example sources (needed for python and SQL)
 mkdir -p "$DISTDIR/examples/src/main"

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
new file mode 100644
index 0000000..e66c4af
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.sasl;
+
+import java.lang.Override;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.sasl.SecretKeyHolder;
+
+/**
+ * A class that manages shuffle secret used by the external shuffle service.
+ */
+public class ShuffleSecretManager implements SecretKeyHolder {
+  private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class);
+  private final ConcurrentHashMap<String, String> shuffleSecretMap;
+
+  private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+  // Spark user used for authenticating SASL connections
+  // Note that this must match the value in org.apache.spark.SecurityManager
+  private static final String SPARK_SASL_USER = "sparkSaslUser";
+
+  /**
+   * Convert the given string to a byte buffer. The resulting buffer can be converted back to
+   * the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external
+   * shuffle service represents shuffle secrets as bytes buffers instead of strings.
+   */
+  public static ByteBuffer stringToBytes(String s) {
+    return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
+  }
+
+  /**
+   * Convert the given byte buffer to a string. The resulting string can be converted back to
+   * the same byte buffer through {@link #stringToBytes(String)}. This is used if the external
+   * shuffle service represents shuffle secrets as bytes buffers instead of strings.
+   */
+  public static String bytesToString(ByteBuffer b) {
+    return new String(b.array(), UTF8_CHARSET);
+  }
+
+  public ShuffleSecretManager() {
+    shuffleSecretMap = new ConcurrentHashMap<String, String>();
+  }
+
+  /**
+   * Register an application with its secret.
+   * Executors need to first authenticate themselves with the same secret before
+   * fetching shuffle files written by other executors in this application.
+   */
+  public void registerApp(String appId, String shuffleSecret) {
+    if (!shuffleSecretMap.contains(appId)) {
+      shuffleSecretMap.put(appId, shuffleSecret);
+      logger.info("Registered shuffle secret for application {}", appId);
+    } else {
+      logger.debug("Application {} already registered", appId);
+    }
+  }
+
+  /**
+   * Register an application with its secret specified as a byte buffer.
+   */
+  public void registerApp(String appId, ByteBuffer shuffleSecret) {
+    registerApp(appId, bytesToString(shuffleSecret));
+  }
+
+  /**
+   * Unregister an application along with its secret.
+   * This is called when the application terminates.
+   */
+  public void unregisterApp(String appId) {
+    if (shuffleSecretMap.contains(appId)) {
+      shuffleSecretMap.remove(appId);
+      logger.info("Unregistered shuffle secret for application {}", appId);
+    } else {
+      logger.warn("Attempted to unregister application {} when it is not registered", appId);
+    }
+  }
+
+  /**
+   * Return the Spark user for authenticating SASL connections.
+   */
+  @Override
+  public String getSaslUser(String appId) {
+    return SPARK_SASL_USER;
+  }
+
+  /**
+   * Return the secret key registered with the given application.
+   * This key is used to authenticate the executors before they can fetch shuffle files
+   * written by this application from the external shuffle service. If the specified
+   * application is not registered, return null.
+   */
+  @Override
+  public String getSecretKey(String appId) {
+    return shuffleSecretMap.get(appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/network/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
new file mode 100644
index 0000000..e60d8c1
--- /dev/null
+++ b/network/yarn/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-network-yarn_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Yarn Shuffle Service Code</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>network-yarn</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <!-- Core dependencies -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-shuffle_2.10</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- Provided dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
new file mode 100644
index 0000000..bb0b8f7
--- /dev/null
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn;
+
+import java.lang.Override;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.sasl.ShuffleSecretManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.yarn.util.HadoopConfigProvider;
+
+/**
+ * An external shuffle service used by Spark on Yarn.
+ *
+ * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
+ * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
+ * The application also automatically derives the service port through `spark.shuffle.service.port`
+ * specified in the Yarn configuration. This is so that both the clients and the server agree on
+ * the same port to communicate on.
+ *
+ * The service also optionally supports authentication. This ensures that executors from one
+ * application cannot read the shuffle files written by those from another. This feature can be
+ * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
+ * Note that the Spark application must also set `spark.authenticate` manually and, unlike in
+ * the case of the service port, will not inherit this setting from the Yarn configuration. This
+ * is because an application running on the same Yarn cluster may choose to not use the external
+ * shuffle service, in which case its setting of `spark.authenticate` should be independent of
+ * the service's.
+ */
+public class YarnShuffleService extends AuxiliaryService {
+  private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
+
+  // Port on which the shuffle server listens for fetch requests
+  private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
+  private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
+
+  // Whether the shuffle server should authenticate fetch requests
+  private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
+  private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
+
+  // An entity that manages the shuffle secret per application
+  // This is used only if authentication is enabled
+  private ShuffleSecretManager secretManager;
+
+  // The actual server that serves shuffle files
+  private TransportServer shuffleServer = null;
+
+  public YarnShuffleService() {
+    super("spark_shuffle");
+    logger.info("Initializing YARN shuffle service for Spark");
+  }
+
+  /**
+   * Return whether authentication is enabled as specified by the configuration.
+   * If so, fetch requests will fail unless the appropriate authentication secret
+   * for the application is provided.
+   */
+  private boolean isAuthenticationEnabled() {
+    return secretManager != null;
+  }
+
+  /**
+   * Start the shuffle server with the given configuration.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) {
+    // If authentication is enabled, set up the shuffle server to use a
+    // special RPC handler that filters out unauthenticated fetch requests
+    boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+    RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+    if (authEnabled) {
+      secretManager = new ShuffleSecretManager();
+      rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
+    }
+
+    int port = conf.getInt(
+      SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+    TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
+    TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
+    shuffleServer = transportContext.createServer(port);
+    String authEnabledString = authEnabled ? "enabled" : "not enabled";
+    logger.info("Started YARN shuffle service for Spark on port {}. " +
+      "Authentication is {}.", port, authEnabledString);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+    String appId = context.getApplicationId().toString();
+    try {
+      ByteBuffer shuffleSecret = context.getApplicationDataForService();
+      logger.info("Initializing application {}", appId);
+      if (isAuthenticationEnabled()) {
+        secretManager.registerApp(appId, shuffleSecret);
+      }
+    } catch (Exception e) {
+      logger.error("Exception when initializing application {}", appId, e);
+    }
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+    String appId = context.getApplicationId().toString();
+    try {
+      logger.info("Stopping application {}", appId);
+      if (isAuthenticationEnabled()) {
+        secretManager.unregisterApp(appId);
+      }
+    } catch (Exception e) {
+      logger.error("Exception when stopping application {}", appId, e);
+    }
+  }
+
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    ContainerId containerId = context.getContainerId();
+    logger.info("Initializing container {}", containerId);
+  }
+
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    ContainerId containerId = context.getContainerId();
+    logger.info("Stopping container {}", containerId);
+  }
+
+  /**
+   * Close the shuffle server to clean up any associated state.
+   */
+  @Override
+  protected void serviceStop() {
+    try {
+      if (shuffleServer != null) {
+        shuffleServer.close();
+      }
+    } catch (Exception e) {
+      logger.error("Exception when stopping service", e);
+    }
+  }
+
+  // Not currently used
+  @Override
+  public ByteBuffer getMetaData() {
+    return ByteBuffer.allocate(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
new file mode 100644
index 0000000..8848617
--- /dev/null
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn.util;
+
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.spark.network.util.ConfigProvider;
+
+/** Use the Hadoop configuration to obtain config values. */
+public class HadoopConfigProvider extends ConfigProvider {
+  private final Configuration conf;
+
+  public HadoopConfigProvider(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String get(String name) {
+    String value = conf.get(name);
+    if (value == null) {
+      throw new NoSuchElementException(name);
+    }
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index eb61353..88ef67c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1229,6 +1229,7 @@
       <id>yarn-alpha</id>
       <modules>
         <module>yarn</module>
+        <module>network/yarn</module>
       </modules>
     </profile>
 
@@ -1236,6 +1237,7 @@
       <id>yarn</id>
       <modules>
         <module>yarn</module>
+        <module>network/yarn</module>
       </modules>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 33618f5..657e4b4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -38,9 +38,9 @@ object BuildCommons {
       "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
       "streaming-zeromq").map(ProjectRef(buildLocation, _))
 
-  val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl, sparkKinesisAsl) =
-    Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl", "kinesis-asl")
-      .map(ProjectRef(buildLocation, _))
+  val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, networkYarn, java8Tests,
+    sparkGangliaLgpl, sparkKinesisAsl) = Seq("yarn", "yarn-stable", "yarn-alpha", "network-yarn",
+    "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples")
     .map(ProjectRef(buildLocation, _))
@@ -143,7 +143,7 @@ object SparkBuild extends PomBuild {
 
   // TODO: Add Sql to mima checks
   allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
-    streamingFlumeSink, networkCommon, networkShuffle).contains(x)).foreach {
+    streamingFlumeSink, networkCommon, networkShuffle, networkYarn).contains(x)).foreach {
       x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7ee4b5c..5f47c79 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 
 import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.sasl.ShuffleSecretManager
 
 @deprecated("use yarn/stable", "1.2.0")
 class ExecutorRunnable(
@@ -90,6 +91,21 @@ class ExecutorRunnable(
 
     ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
 
+    // If external shuffle service is enabled, register with the Yarn shuffle service already
+    // started on the NodeManager and, if authentication is enabled, provide it with our secret
+    // key for fetching shuffle files later
+    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+      val secretString = securityMgr.getSecretKey()
+      val secretBytes =
+        if (secretString != null) {
+          ShuffleSecretManager.stringToBytes(secretString)
+        } else {
+          // Authentication is not enabled, so just provide dummy metadata
+          ByteBuffer.allocate(0)
+        }
+      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+    }
+
     // Send the start request to the ContainerManager
     val startReq = Records.newRecord(classOf[StartContainerRequest])
     .asInstanceOf[StartContainerRequest]

http://git-wip-us.apache.org/repos/asf/spark/blob/61a5cced/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 0b5a92d..18f48b4 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
 
 import org.apache.spark.{SecurityManager, SparkConf, Logging}
+import org.apache.spark.network.sasl.ShuffleSecretManager
 
 
 class ExecutorRunnable(
@@ -89,6 +90,21 @@ class ExecutorRunnable(
 
     ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
 
+    // If external shuffle service is enabled, register with the Yarn shuffle service already
+    // started on the NodeManager and, if authentication is enabled, provide it with our secret
+    // key for fetching shuffle files later
+    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+      val secretString = securityMgr.getSecretKey()
+      val secretBytes =
+        if (secretString != null) {
+          ShuffleSecretManager.stringToBytes(secretString)
+        } else {
+          // Authentication is not enabled, so just provide dummy metadata
+          ByteBuffer.allocate(0)
+        }
+      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+    }
+
     // Send the start request to the ContainerManager
     nmClient.startContainer(container, ctx)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org