You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/26 21:51:35 UTC

spark git commit: [SPARK-5355] use j.u.c.ConcurrentHashMap instead of TrieMap

Repository: spark
Updated Branches:
  refs/heads/master 81251682e -> 142093179


[SPARK-5355] use j.u.c.ConcurrentHashMap instead of TrieMap

j.u.c.ConcurrentHashMap is more battle tested.

cc rxin JoshRosen pwendell

Author: Davies Liu <da...@databricks.com>

Closes #4208 from davies/safe-conf and squashes the following commits:

c2182dc [Davies Liu] address comments, fix tests
3a1d821 [Davies Liu] fix test
da14ced [Davies Liu] Merge branch 'master' of github.com:apache/spark into safe-conf
ae4d305 [Davies Liu] change to j.u.c.ConcurrentMap
f8fa1cf [Davies Liu] change to TrieMap
a1d769a [Davies Liu] make SparkConf thread-safe


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14209317
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14209317
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14209317

Branch: refs/heads/master
Commit: 142093179a4c40bdd90744191034de7b94a963ff
Parents: 8125168
Author: Davies Liu <da...@databricks.com>
Authored: Mon Jan 26 12:51:32 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Jan 26 12:51:32 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 38 ++++++++++----------
 .../deploy/worker/WorkerArgumentsTest.scala     |  4 +--
 .../apache/spark/storage/LocalDirsSuite.scala   |  2 +-
 3 files changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14209317/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index f9d4aa4..cd91c8f 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark
 
+import java.util.concurrent.ConcurrentHashMap
+
 import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable.{HashMap, LinkedHashSet}
+import scala.collection.mutable.LinkedHashSet
+
 import org.apache.spark.serializer.KryoSerializer
 
 /**
@@ -47,12 +49,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
   /** Create a SparkConf that loads defaults from system properties and the classpath */
   def this() = this(true)
 
-  private[spark] val settings = new TrieMap[String, String]()
+  private val settings = new ConcurrentHashMap[String, String]()
 
   if (loadDefaults) {
     // Load any spark.* system properties
     for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
-      settings(k) = v
+      set(k, v)
     }
   }
 
@@ -64,7 +66,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     if (value == null) {
       throw new NullPointerException("null value for " + key)
     }
-    settings(key) = value
+    settings.put(key, value)
     this
   }
 
@@ -130,15 +132,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
   /** Set multiple parameters together */
   def setAll(settings: Traversable[(String, String)]) = {
-    this.settings ++= settings
+    this.settings.putAll(settings.toMap.asJava)
     this
   }
 
   /** Set a parameter if it isn't already configured */
   def setIfMissing(key: String, value: String): SparkConf = {
-    if (!settings.contains(key)) {
-      settings(key) = value
-    }
+    settings.putIfAbsent(key, value)
     this
   }
 
@@ -164,21 +164,23 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
   /** Get a parameter; throws a NoSuchElementException if it's not set */
   def get(key: String): String = {
-    settings.getOrElse(key, throw new NoSuchElementException(key))
+    getOption(key).getOrElse(throw new NoSuchElementException(key))
   }
 
   /** Get a parameter, falling back to a default if not set */
   def get(key: String, defaultValue: String): String = {
-    settings.getOrElse(key, defaultValue)
+    getOption(key).getOrElse(defaultValue)
   }
 
   /** Get a parameter as an Option */
   def getOption(key: String): Option[String] = {
-    settings.get(key)
+    Option(settings.get(key))
   }
 
   /** Get all parameters as a list of pairs */
-  def getAll: Array[(String, String)] = settings.toArray
+  def getAll: Array[(String, String)] = {
+    settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
+  }
 
   /** Get a parameter as an integer, falling back to a default if not set */
   def getInt(key: String, defaultValue: Int): Int = {
@@ -225,11 +227,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
   def getAppId: String = get("spark.app.id")
 
   /** Does the configuration contain a given parameter? */
-  def contains(key: String): Boolean = settings.contains(key)
+  def contains(key: String): Boolean = settings.containsKey(key)
 
   /** Copy this object */
   override def clone: SparkConf = {
-    new SparkConf(false).setAll(settings)
+    new SparkConf(false).setAll(getAll)
   }
 
   /**
@@ -241,7 +243,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
   /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
     * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
   private[spark] def validateSettings() {
-    if (settings.contains("spark.local.dir")) {
+    if (contains("spark.local.dir")) {
       val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
         "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
       logWarning(msg)
@@ -266,7 +268,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     }
 
     // Validate spark.executor.extraJavaOptions
-    settings.get(executorOptsKey).map { javaOpts =>
+    getOption(executorOptsKey).map { javaOpts =>
       if (javaOpts.contains("-Dspark")) {
         val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
           "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
@@ -346,7 +348,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
    * configuration out for debugging.
    */
   def toDebugString: String = {
-    settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
+    getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14209317/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
index 1a28a9a..372d7aa 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -43,7 +43,7 @@ class WorkerArgumentsTest extends FunSuite {
       }
 
       override def clone: SparkConf = {
-        new MySparkConf().setAll(settings)
+        new MySparkConf().setAll(getAll)
       }
     }
     val conf = new MySparkConf()
@@ -62,7 +62,7 @@ class WorkerArgumentsTest extends FunSuite {
       }
 
       override def clone: SparkConf = {
-        new MySparkConf().setAll(settings)
+        new MySparkConf().setAll(getAll)
       }
     }
     val conf = new MySparkConf()

http://git-wip-us.apache.org/repos/asf/spark/blob/14209317/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index dae7bf0..8cf951a 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -49,7 +49,7 @@ class LocalDirsSuite extends FunSuite {
       }
 
       override def clone: SparkConf = {
-        new MySparkConf().setAll(settings)
+        new MySparkConf().setAll(getAll)
       }
     }
     // spark.local.dir only contains invalid directories, but that's not a problem since


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org