You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/24 22:32:29 UTC

git commit: [SPARK-4013] Do not create multiple actor systems on each executor

Repository: spark
Updated Branches:
  refs/heads/master 098f83c7c -> b563987e8


[SPARK-4013] Do not create multiple actor systems on each executor

In the existing code, each coarse-grained executor has two concurrently running actor systems. This causes many more error messages to be logged than necessary when the executor is lost or killed because we receive a disassociation event for each of these actor systems.

This is blocking #2840.

Author: Andrew Or <an...@gmail.com>

Closes #2863 from andrewor14/executor-actor-system and squashes the following commits:

44ce2e0 [Andrew Or] Avoid starting two actor systems on each executor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b563987e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b563987e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b563987e

Branch: refs/heads/master
Commit: b563987e8dffc2aed1a834d555589a41cfb2a706
Parents: 098f83c
Author: Andrew Or <an...@gmail.com>
Authored: Fri Oct 24 13:32:23 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Oct 24 13:32:23 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 12 ++---
 .../main/scala/org/apache/spark/SparkEnv.scala  | 49 ++++++++++++++++++--
 .../executor/CoarseGrainedExecutorBackend.scala | 11 +++--
 .../org/apache/spark/executor/Executor.scala    | 11 +++--
 4 files changed, 61 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 55602a9..4565832 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging {
   // An asynchronous listener bus for Spark events
   private[spark] val listenerBus = new LiveListenerBus
 
-  // Create the Spark execution environment (cache, map output tracker, etc)
   conf.set("spark.executor.id", "driver")
-  private[spark] val env = SparkEnv.create(
-    conf,
-    "<driver>",
-    conf.get("spark.driver.host"),
-    conf.get("spark.driver.port").toInt,
-    isDriver = true,
-    isLocal = isLocal,
-    listenerBus = listenerBus)
+
+  // Create the Spark execution environment (cache, map output tracker, etc)
+  private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
   SparkEnv.set(env)
 
   // Used to store a URL for each static file/jar together with the file's local timestamp

http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 906a00b..5c076e5 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -144,14 +144,46 @@ object SparkEnv extends Logging {
     env
   }
 
-  private[spark] def create(
+  /**
+   * Create a SparkEnv for the driver.
+   */
+  private[spark] def createDriverEnv(
+      conf: SparkConf,
+      isLocal: Boolean,
+      listenerBus: LiveListenerBus): SparkEnv = {
+    assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
+    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
+    val hostname = conf.get("spark.driver.host")
+    val port = conf.get("spark.driver.port").toInt
+    create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
+  }
+
+  /**
+   * Create a SparkEnv for an executor.
+   * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+   */
+  private[spark] def createExecutorEnv(
+      conf: SparkConf,
+      executorId: String,
+      hostname: String,
+      port: Int,
+      isLocal: Boolean,
+      actorSystem: ActorSystem = null): SparkEnv = {
+    create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
+  }
+
+  /**
+   * Helper method to create a SparkEnv for a driver or an executor.
+   */
+  private def create(
       conf: SparkConf,
       executorId: String,
       hostname: String,
       port: Int,
       isDriver: Boolean,
       isLocal: Boolean,
-      listenerBus: LiveListenerBus = null): SparkEnv = {
+      listenerBus: LiveListenerBus = null,
+      defaultActorSystem: ActorSystem = null): SparkEnv = {
 
     // Listener bus is only used on the driver
     if (isDriver) {
@@ -159,9 +191,16 @@ object SparkEnv extends Logging {
     }
 
     val securityManager = new SecurityManager(conf)
-    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
-      actorSystemName, hostname, port, conf, securityManager)
+
+    // If an existing actor system is already provided, use it.
+    // This is the case when an executor is launched in coarse-grained mode.
+    val (actorSystem, boundPort) =
+      Option(defaultActorSystem) match {
+        case Some(as) => (as, port)
+        case None =>
+          val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
+          AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
+      }
 
     // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
     // This is so that we tell the executors the correct port to connect to.

http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c40a3e1..697154d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import scala.concurrent.Await
 
-import akka.actor.{Actor, ActorSelection, Props}
+import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
 import akka.pattern.Patterns
 import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
 
@@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     executorId: String,
     hostPort: String,
     cores: Int,
-    sparkProperties: Seq[(String, String)])
+    sparkProperties: Seq[(String, String)],
+    actorSystem: ActorSystem)
   extends Actor with ActorLogReceive with ExecutorBackend with Logging {
 
   Utils.checkHostPort(hostPort, "Expected hostport")
@@ -57,8 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       // Make this host instead of hostPort ?
-      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
-        false)
+      val (hostname, _) = Utils.parseHostPort(hostPort)
+      executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
 
     case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)
@@ -135,7 +136,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val sparkHostPort = hostname + ":" + boundPort
       actorSystem.actorOf(
         Props(classOf[CoarseGrainedExecutorBackend],
-          driverUrl, executorId, sparkHostPort, cores, props),
+          driverUrl, executorId, sparkHostPort, cores, props, actorSystem),
         name = "Executor")
       workerUrl.foreach { url =>
         actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")

http://git-wip-us.apache.org/repos/asf/spark/blob/b563987e/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b75b9b..70a46c7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.util.control.NonFatal
 
+import akka.actor.ActorSystem
+
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
@@ -35,12 +37,14 @@ import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
  * Spark executor used with Mesos, YARN, and the standalone scheduler.
+ * In coarse-grained mode, an existing actor system is provided.
  */
 private[spark] class Executor(
     executorId: String,
     slaveHostname: String,
     properties: Seq[(String, String)],
-    isLocal: Boolean = false)
+    isLocal: Boolean = false,
+    actorSystem: ActorSystem = null)
   extends Logging
 {
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
@@ -77,8 +81,9 @@ private[spark] class Executor(
   conf.set("spark.executor.id", "executor." + executorId)
   private val env = {
     if (!isLocal) {
-      val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
-        isDriver = false, isLocal = false)
+      val port = conf.getInt("spark.executor.port", 0)
+      val _env = SparkEnv.createExecutorEnv(
+        conf, executorId, slaveHostname, port, isLocal, actorSystem)
       SparkEnv.set(_env)
       _env.metricsSystem.registerSource(executorSource)
       _env


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org