You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/27 00:28:53 UTC

spark git commit: [SPARK-6602] [CORE] Remove some places in core that calling SparkEnv.actorSystem

Repository: spark
Updated Branches:
  refs/heads/master 2e9a5f229 -> 9f742241c


[SPARK-6602] [CORE] Remove some places in core that calling SparkEnv.actorSystem

Author: zsxwing <zs...@gmail.com>

Closes #6333 from zsxwing/remove-actor-system-usage and squashes the following commits:

f125aa6 [zsxwing] Fix YarnAllocatorSuite
ceadcf6 [zsxwing] Change the "port" parameter type of "AkkaUtils.address" to "int"; update ApplicationMaster and YarnAllocator to get the driverUrl from RpcEnv
3239380 [zsxwing] Remove some places in core that calling SparkEnv.actorSystem


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f742241
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f742241
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f742241

Branch: refs/heads/master
Commit: 9f742241cbf07e5e2dadfee8dcc9b382bb2dbea1
Parents: 2e9a5f2
Author: zsxwing <zs...@gmail.com>
Authored: Tue May 26 15:28:49 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue May 26 15:28:49 2015 -0700

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala       | 17 +++++++++++------
 .../mesos/CoarseMesosSchedulerBackend.scala       |  9 ++++-----
 .../scala/org/apache/spark/util/AkkaUtils.scala   |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala     | 18 ++++++++++++------
 .../apache/spark/deploy/yarn/YarnAllocator.scala  | 12 ++----------
 .../apache/spark/deploy/yarn/YarnRMClient.scala   |  3 ++-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala    |  1 +
 7 files changed, 33 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index b4b8a63..ed3dde0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -19,9 +19,9 @@ package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
 import java.util.{TimerTask, Timer}
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
-import scala.concurrent.duration._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
@@ -32,7 +32,7 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
 
@@ -64,6 +64,9 @@ private[spark] class TaskSchedulerImpl(
   // How often to check for speculative tasks
   val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
 
+  private val speculationScheduler =
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
+
   // Threshold above which we warn user initial TaskSet may be starved
   val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
 
@@ -142,10 +145,11 @@ private[spark] class TaskSchedulerImpl(
 
     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
       logInfo("Starting speculative execution thread")
-      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
-            SPECULATION_INTERVAL_MS milliseconds) {
-        Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
-      }(sc.env.actorSystem.dispatcher)
+      speculationScheduler.scheduleAtFixedRate(new Runnable {
+        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
+          checkSpeculatableTasks()
+        }
+      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
     }
   }
 
@@ -412,6 +416,7 @@ private[spark] class TaskSchedulerImpl(
   }
 
   override def stop() {
+    speculationScheduler.shutdown()
     if (backend != null) {
       backend.stop()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index dc59545..aff0865 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -25,9 +25,10 @@ import scala.collection.mutable.{HashMap, HashSet}
 
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.Utils
 import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
@@ -115,11 +116,9 @@ private[spark] class CoarseMesosSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val driverUrl = AkkaUtils.address(
-      AkkaUtils.protocol(sc.env.actorSystem),
+    val driverUrl = sc.env.rpcEnv.uriOf(
       SparkEnv.driverActorSystemName,
-      conf.get("spark.driver.host"),
-      conf.get("spark.driver.port"),
+      RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
 
     val uri = conf.getOption("spark.executor.uri")

http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index de3316d..7513b1b 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -235,7 +235,7 @@ private[spark] object AkkaUtils extends Logging {
       protocol: String,
       systemName: String,
       host: String,
-      port: Any,
+      port: Int,
       actorName: String): String = {
     s"$protocol://$systemName@$host:$port/user/$actorName"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index af4927b..760e458 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -34,7 +34,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
 import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util._
 
@@ -220,7 +220,7 @@ private[spark] class ApplicationMaster(
     sparkContextRef.compareAndSet(sc, null)
   }
 
-  private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
+  private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
     val sc = sparkContextRef.get()
 
     val appId = client.getAttemptId().getApplicationId().toString()
@@ -231,8 +231,14 @@ private[spark] class ApplicationMaster(
         .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
         .getOrElse("")
 
-    allocator = client.register(yarnConf,
-      if (sc != null) sc.getConf else sparkConf,
+    val _sparkConf = if (sc != null) sc.getConf else sparkConf
+    val driverUrl = _rpcEnv.uriOf(
+        SparkEnv.driverActorSystemName,
+        RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
+        CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
+    allocator = client.register(driverUrl,
+      yarnConf,
+      _sparkConf,
       if (sc != null) sc.preferredNodeLocationData else Map(),
       uiAddress,
       historyAddress,
@@ -279,7 +285,7 @@ private[spark] class ApplicationMaster(
         sc.getConf.get("spark.driver.host"),
         sc.getConf.get("spark.driver.port"),
         isClusterMode = true)
-      registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
+      registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
       userClassThread.join()
     }
   }
@@ -289,7 +295,7 @@ private[spark] class ApplicationMaster(
     rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
     waitForSparkDriver()
     addAmIpFilter()
-    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
+    registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()

http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a08f56..21193e7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -34,10 +34,8 @@ import org.apache.hadoop.yarn.util.RackResolver
 
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.AkkaUtils
 
 /**
  * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -53,6 +51,7 @@ import org.apache.spark.util.AkkaUtils
  * synchronized.
  */
 private[yarn] class YarnAllocator(
+    driverUrl: String,
     conf: Configuration,
     sparkConf: SparkConf,
     amClient: AMRMClient[ContainerRequest],
@@ -107,13 +106,6 @@ private[yarn] class YarnAllocator(
     new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
   launcherPool.allowCoreThreadTimeOut(true)
 
-  private val driverUrl = AkkaUtils.address(
-    AkkaUtils.protocol(securityMgr.akkaSSLOptions.enabled),
-    SparkEnv.driverActorSystemName,
-    sparkConf.get("spark.driver.host"),
-    sparkConf.get("spark.driver.port"),
-    CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
-
   // For testing
   private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index ffe71df..7f533ee 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -55,6 +55,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
    * @param uiHistoryAddress Address of the application on the History Server.
    */
   def register(
+      driverUrl: String,
       conf: YarnConfiguration,
       sparkConf: SparkConf,
       preferredNodeLocations: Map[String, Set[SplitInfo]],
@@ -72,7 +73,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
       amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
       registered = true
     }
-    new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
+    new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9f742241/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 455f101..b343cbb 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,6 +90,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
       "--jar", "somejar.jar",
       "--class", "SomeClass")
     new YarnAllocator(
+      "not used",
       conf,
       sparkConf,
       rmClient,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org