You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/23 20:56:04 UTC

[03/16] git commit: Fix issue when local properties pass from parent to child thread

Fix issue when local properties pass from parent to child thread


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ffa5f8e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ffa5f8e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ffa5f8e1

Branch: refs/heads/master
Commit: ffa5f8e11db26dd616e85b9d941de3590ca3643e
Parents: 2aff798
Author: jerryshao <sa...@intel.com>
Authored: Wed Sep 18 17:33:24 2013 +0800
Committer: jerryshao <sa...@intel.com>
Committed: Wed Sep 18 17:33:24 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  6 +++-
 .../scala/org/apache/spark/ThreadingSuite.scala | 38 +++++++++++++++++++-
 2 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa5f8e1/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 72540c7..3922e9a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -256,7 +256,9 @@ class SparkContext(
   private[spark] var checkpointDir: Option[String] = None
 
   // Thread Local variable that can be used by users to pass information down the stack
-  private val localProperties = new ThreadLocal[Properties]
+  private val localProperties = new InheritableThreadLocal[Properties] {
+    override protected def childValue(parent: Properties): Properties = new Properties(parent)
+  }
 
   def initLocalProperties() {
     localProperties.set(new Properties())
@@ -273,6 +275,8 @@ class SparkContext(
     }
   }
 
+  def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+
   /** Set a human readable description of the current job. */
   def setJobDescription(value: String) {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ffa5f8e1/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 69383dd..331f79d 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
 }
 
 class ThreadingSuite extends FunSuite with LocalSparkContext {
-  
+
   test("accessing SparkContext form a different thread") {
     sc = new SparkContext("local", "test")
     val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,40 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
       fail("One or more threads didn't see runningThreads = 4")
     }
   }
+
+  test("set local properties in different thread") {
+    sc = new SparkContext("local", "test")
+
+    val threads = (1 to 5).map{ i =>
+      new Thread() {
+        override def run() {
+          sc.setLocalProperty("test", i.toString)
+          assert(sc.getLocalProperty("test") === i.toString)
+        }
+      }
+    }
+
+    threads.foreach(_.start())
+
+    assert(sc.getLocalProperty("test") === null)
+  }
+
+  test("set and get local properties in parent-children thread") {
+    sc = new SparkContext("local", "test")
+    sc.setLocalProperty("test", "parent")
+
+    val threads = (1 to 5).map{ i =>
+      new Thread() {
+        override def run() {
+          assert(sc.getLocalProperty("test") === "parent")
+          sc.setLocalProperty("test", i.toString)
+          assert(sc.getLocalProperty("test") === i.toString)
+        }
+      }
+    }
+
+    threads.foreach(_.start())
+    assert(sc.getLocalProperty("test") === "parent")
+    assert(sc.getLocalProperty("Foo") === null)
+  }
 }