You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:42:13 UTC
[29/50] [abbrv] git commit: Made running SparkActorSystem specific to
executors only.
Made running SparkActorSystem specific to executors only.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/09e8be9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/09e8be9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/09e8be9a
Branch: refs/heads/master
Commit: 09e8be9a6225203337a01e618851e807a1482603
Parents: 0f24576
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Tue Dec 3 11:27:45 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Tue Dec 3 11:27:45 2013 +0530
----------------------------------------------------------------------
.../spark/executor/CoarseGrainedExecutorBackend.scala | 3 ++-
.../src/main/scala/org/apache/spark/util/AkkaUtils.scala | 11 +++++++++--
2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/09e8be9a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dcb12be..406e015 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
+ useSparkAS = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/09e8be9a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 407e9ff..f3e2644 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -35,7 +35,9 @@ private[spark] object AkkaUtils {
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
*/
- def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
+ def createActorSystem(name: String, host: String, port: Int,
+ useSparkAS: Boolean = false): (ActorSystem, Int) = {
+
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
@@ -70,7 +72,12 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
- val actorSystem = SparkActorSystem(name, akkaConf)
+ val actorSystem = if (useSparkAS) {
+ SparkActorSystem(name, akkaConf)
+ }
+ else {
+ ActorSystem(name, akkaConf)
+ }
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get