You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/28 21:08:21 UTC

spark git commit: [SPARK-4286] Add an external shuffle service that can be run as a daemon.

Repository: spark
Updated Branches:
  refs/heads/master 52ccf1d37 -> 8aab94d89


[SPARK-4286] Add an external shuffle service that can be run as a daemon.

This allows Mesos deployments to use the shuffle service (and implicitly dynamic allocation). It does so by adding a new "main" class and two corresponding scripts in `sbin`:

- `sbin/start-shuffle-service.sh`
- `sbin/stop-shuffle-service.sh`

Specific options can be passed in `SPARK_SHUFFLE_OPTS`.

This is picking up work from #3861 /cc tnachen

Author: Iulian Dragos <ja...@gmail.com>

Closes #4990 from dragos/feature/external-shuffle-service and squashes the following commits:

6c2b148 [Iulian Dragos] Import order and wrong name fixup.
07804ad [Iulian Dragos] Moved ExternalShuffleService to the `deploy` package + other minor tweaks.
4dc1f91 [Iulian Dragos] Reviewer’s comments:
8145429 [Iulian Dragos] Add an external shuffle service that can be run as a daemon.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aab94d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aab94d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aab94d8

Branch: refs/heads/master
Commit: 8aab94d8984e9d12194dbda47b2e7d9dbc036889
Parents: 52ccf1d
Author: Iulian Dragos <ja...@gmail.com>
Authored: Tue Apr 28 12:08:18 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Apr 28 12:08:18 2015 -0700

----------------------------------------------------------------------
 conf/spark-env.sh.template                      |   3 +-
 .../spark/deploy/ExternalShuffleService.scala   | 111 +++++++++++++++++++
 .../worker/StandaloneWorkerShuffleService.scala |  66 -----------
 .../org/apache/spark/deploy/worker/Worker.scala |  13 ++-
 docs/job-scheduling.md                          |   2 +-
 .../launcher/SparkClassCommandBuilder.java      |   4 +
 sbin/start-shuffle-service.sh                   |  33 ++++++
 sbin/stop-shuffle-service.sh                    |  25 +++++
 8 files changed, 183 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/conf/spark-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index 67f81d3..43c4288 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -3,7 +3,7 @@
 # This file is sourced when running various Spark programs.
 # Copy it as spark-env.sh and edit that to configure Spark for your site.
 
-# Options read when launching programs locally with 
+# Options read when launching programs locally with
 # ./bin/run-example or ./bin/spark-submit
 # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
 # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
@@ -39,6 +39,7 @@
 # - SPARK_WORKER_DIR, to set the working directory of worker processes
 # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
 # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
+# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
 # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
 # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
new file mode 100644
index 0000000..cd16f99
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.util.concurrent.CountDownLatch
+
+import org.apache.spark.{Logging, SparkConf, SecurityManager}
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.sasl.SaslRpcHandler
+import org.apache.spark.network.server.TransportServer
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.util.Utils
+
+/**
+ * Provides a server from which Executors can read shuffle files (rather than reading directly from
+ * each other), to provide uninterrupted access to the files in the face of executors being turned
+ * off or killed.
+ *
+ * Optionally requires SASL authentication in order to read. See [[SecurityManager]].
+ */
+private[deploy]
+class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
+  extends Logging {
+
+  private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
+  private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
+  private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
+
+  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
+  private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
+  private val transportContext: TransportContext = {
+    val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
+    new TransportContext(transportConf, handler)
+  }
+
+  private var server: TransportServer = _
+
+  /** Starts the external shuffle service if the user has configured us to. */
+  def startIfEnabled() {
+    if (enabled) {
+      start()
+    }
+  }
+
+  /** Start the external shuffle service */
+  def start() {
+    require(server == null, "Shuffle server already started")
+    logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
+    server = transportContext.createServer(port)
+  }
+
+  def stop() {
+    if (server != null) {
+      server.close()
+      server = null
+    }
+  }
+}
+
+/**
+ * A main class for running the external shuffle service.
+ */
+object ExternalShuffleService extends Logging {
+  @volatile
+  private var server: ExternalShuffleService = _
+
+  private val barrier = new CountDownLatch(1)
+
+  def main(args: Array[String]): Unit = {
+    val sparkConf = new SparkConf
+    Utils.loadDefaultSparkProperties(sparkConf)
+    val securityManager = new SecurityManager(sparkConf)
+
+    // we override this value since this service is started from the command line
+    // and we assume the user really wants it to be running
+    sparkConf.set("spark.shuffle.service.enabled", "true")
+    server = new ExternalShuffleService(sparkConf, securityManager)
+    server.start()
+
+    installShutdownHook()
+
+    // keep running until the process is terminated
+    barrier.await()
+  }
+
+  private def installShutdownHook(): Unit = {
+    Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
+      override def run() {
+        logInfo("Shutting down shuffle service.")
+        server.stop()
+        barrier.countDown()
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
deleted file mode 100644
index b979896..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.worker
-
-import org.apache.spark.{Logging, SparkConf, SecurityManager}
-import org.apache.spark.network.TransportContext
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.sasl.SaslRpcHandler
-import org.apache.spark.network.server.TransportServer
-import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
-
-/**
- * Provides a server from which Executors can read shuffle files (rather than reading directly from
- * each other), to provide uninterrupted access to the files in the face of executors being turned
- * off or killed.
- *
- * Optionally requires SASL authentication in order to read. See [[SecurityManager]].
- */
-private[worker]
-class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
-  extends Logging {
-
-  private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
-  private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
-  private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
-
-  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
-  private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
-  private val transportContext: TransportContext = {
-    val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
-    new TransportContext(transportConf, handler)
-  }
-
-  private var server: TransportServer = _
-
-  /** Starts the external shuffle service if the user has configured us to. */
-  def startIfEnabled() {
-    if (enabled) {
-      require(server == null, "Shuffle server already started")
-      logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
-      server = transportContext.createServer(port)
-    }
-  }
-
-  def stop() {
-    if (enabled && server != null) {
-      server.close()
-      server = null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3ee2eb6..8f3cc54 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -34,6 +34,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.ExternalShuffleService
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
@@ -61,7 +62,7 @@ private[worker] class Worker(
   assert (port > 0)
 
   // For worker and executor IDs
-  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  
+  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
   private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
@@ -85,10 +86,10 @@ private[worker] class Worker(
 
   private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
   // How often worker will clean up old app folders
-  private val CLEANUP_INTERVAL_MILLIS = 
+  private val CLEANUP_INTERVAL_MILLIS =
     conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
   // TTL for app folders/data;  after TTL expires it will be cleaned up
-  private val APP_DATA_RETENTION_SECS = 
+  private val APP_DATA_RETENTION_SECS =
     conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
 
   private val testing: Boolean = sys.props.contains("spark.testing")
@@ -112,7 +113,7 @@ private[worker] class Worker(
     } else {
       new File(sys.env.get("SPARK_HOME").getOrElse("."))
     }
-  
+
   var workDir: File = null
   val finishedExecutors = new HashMap[String, ExecutorRunner]
   val drivers = new HashMap[String, DriverRunner]
@@ -122,7 +123,7 @@ private[worker] class Worker(
   val finishedApps = new HashSet[String]
 
   // The shuffle service is not actually started unless configured.
-  private val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
+  private val shuffleService = new ExternalShuffleService(conf, securityMgr)
 
   private val publicAddress = {
     val envVar = conf.getenv("SPARK_PUBLIC_DNS")
@@ -134,7 +135,7 @@ private[worker] class Worker(
 
   private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
   private val workerSource = new WorkerSource(this)
-  
+
   private var registrationRetryTimer: Option[Cancellable] = None
 
   var coresUsed = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 963e88a..8d9c2ba 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -32,7 +32,7 @@ Resource allocation can be configured as follows, based on the cluster type:
 * **Standalone mode:** By default, applications submitted to the standalone mode cluster will run in
   FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit
   the number of nodes an application uses by setting the `spark.cores.max` configuration property in it,
-  or change the default for applications that don't set this setting through `spark.deploy.defaultCores`. 
+  or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
   Finally, in addition to controlling cores, each application's `spark.executor.memory` setting controls
   its memory use.
 * **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index e601a0a..d80abf2 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -69,6 +69,10 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
     } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
       javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
       memKey = "SPARK_EXECUTOR_MEMORY";
+    } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService")) {
+      javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+      javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
+      memKey = "SPARK_DAEMON_MEMORY";
     } else if (className.startsWith("org.apache.spark.tools.")) {
       String sparkHome = getSparkHome();
       File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/sbin/start-shuffle-service.sh
----------------------------------------------------------------------
diff --git a/sbin/start-shuffle-service.sh b/sbin/start-shuffle-service.sh
new file mode 100755
index 0000000..4fddcf7
--- /dev/null
+++ b/sbin/start-shuffle-service.sh
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the external shuffle server on the machine this script is executed on.
+#
+# Usage: start-shuffle-server.sh
+#
+# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle server configuration.
+#
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 1

http://git-wip-us.apache.org/repos/asf/spark/blob/8aab94d8/sbin/stop-shuffle-service.sh
----------------------------------------------------------------------
diff --git a/sbin/stop-shuffle-service.sh b/sbin/stop-shuffle-service.sh
new file mode 100755
index 0000000..4cb6891
--- /dev/null
+++ b/sbin/stop-shuffle-service.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the external shuffle service on the machine this script is executed on.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.ExternalShuffleService 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org