You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/21 00:06:54 UTC

[34/50] git commit: Changed localProperties to use ThreadLocal (not DynamicVariable).

Changed localProperties to use ThreadLocal (not DynamicVariable).

The fact that DynamicVariable uses an InheritableThreadLocal
can cause problems where the properties end up being shared
across threads in certain circumstances.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/93c42532
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/93c42532
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/93c42532

Branch: refs/heads/master
Commit: 93c425327560e386aa23b42387cea24eae228e79
Parents: 91a59e6
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Sep 11 12:12:20 2013 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Sep 11 13:01:39 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala     | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93c42532/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 29407bc..72540c7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -27,7 +27,6 @@ import scala.collection.generic.Growable
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
-import scala.util.DynamicVariable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -257,20 +256,20 @@ class SparkContext(
   private[spark] var checkpointDir: Option[String] = None
 
   // Thread Local variable that can be used by users to pass information down the stack
-  private val localProperties = new DynamicVariable[Properties](null)
+  private val localProperties = new ThreadLocal[Properties]
 
   def initLocalProperties() {
-    localProperties.value = new Properties()
+    localProperties.set(new Properties())
   }
 
   def setLocalProperty(key: String, value: String) {
-    if (localProperties.value == null) {
-      localProperties.value = new Properties()
+    if (localProperties.get() == null) {
+      localProperties.set(new Properties())
     }
     if (value == null) {
-      localProperties.value.remove(key)
+      localProperties.get.remove(key)
     } else {
-      localProperties.value.setProperty(key, value)
+      localProperties.get.setProperty(key, value)
     }
   }
 
@@ -724,7 +723,7 @@ class SparkContext(
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
     val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
-      localProperties.value)
+      localProperties.get)
     logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
     rdd.doCheckpoint()
     result
@@ -807,7 +806,8 @@ class SparkContext(
     val callSite = Utils.formatSparkCallSite
     logInfo("Starting job: " + callSite)
     val start = System.nanoTime
-    val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
+    val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
+      localProperties.get)
     logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
     result
   }