You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/07 09:54:36 UTC

[2/4] git commit: Allow users to set arbitrary akka configurations via spark conf.

Allow users to set arbitrary akka configurations via spark conf.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b3018811
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b3018811
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b3018811

Branch: refs/heads/master
Commit: b3018811e106e6414816380a35c07a8564945d37
Parents: b97ef21
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Jan 6 15:47:40 2014 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Tue Jan 7 13:01:43 2014 +0530

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkConf.scala      | 7 +++++++
 core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 8 +++++---
 docs/configuration.md                                     | 8 ++++++++
 3 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b3018811/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 55f2703..2d437f1 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -172,6 +172,13 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
           .map{case (k, v) => (k.substring(prefix.length), v)}
   }
 
+  /** Get all akka conf variables set on this SparkConf */
+  def getAkkaConf: Seq[(String, String)] = {
+    getAll.filter {
+      case (k, v) => k.startsWith("akka.")
+    }
+  }
+
   /** Does the configuration contain a given parameter? */
   def contains(key: String): Boolean = settings.contains(key)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b3018811/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 7df7e3d..2ee3781 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.util
 
+import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
 import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
 import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
 
+import org.apache.log4j.{Level, Logger}
 import org.apache.spark.SparkConf
 
 /**
@@ -64,7 +65,8 @@ private[spark] object AkkaUtils {
       conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
     val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
 
-    val akkaConf = ConfigFactory.parseString(
+    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
+      ConfigFactory.parseString(
       s"""
       |akka.daemonic = on
       |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
@@ -86,7 +88,7 @@ private[spark] object AkkaUtils {
       |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
       |akka.log-dead-letters = $lifecycleEvents
       |akka.log-dead-letters-during-shutdown = $lifecycleEvents
-      """.stripMargin)
+      """.stripMargin))
 
     val actorSystem = if (indestructible) {
       IndestructibleActorSystem(name, akkaConf)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b3018811/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 09342fe..8a8857b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -361,6 +361,14 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td>akka.x.y....</td>
+  <td>value</td>
+  <td>
+    An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that spark context and its assigned executors as well.
+  </td>
+</tr>
+
+<tr>
   <td>spark.shuffle.consolidateFiles</td>
   <td>false</td>
   <td>