You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/09 20:55:22 UTC

[1/2] git commit: Propagate the SparkContext local property from the thread that calls the spark-repl to the actual execution thread.

Updated Branches:
  refs/heads/master dd63c548c -> 72a601ec3


Propagate the SparkContext local property from the thread that calls the spark-repl to the actual execution thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/31929994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/31929994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/31929994

Branch: refs/heads/master
Commit: 319299941dbf4bfa2aaa8b5078e313ca45cb5207
Parents: 3d4ad84
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Nov 9 00:32:14 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Nov 9 00:32:14 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  6 ++++
 .../org/apache/spark/repl/SparkIMain.scala      | 11 ++++--
 .../scala/org/apache/spark/repl/ReplSuite.scala | 35 ++++++++++++++++++--
 3 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31929994/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4a98491..dbb354d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -280,6 +280,12 @@ class SparkContext(
     override protected def childValue(parent: Properties): Properties = new Properties(parent)
   }
 
+  private[spark] def getLocalProperties(): Properties = localProperties.get()
+
+  private[spark] def setLocalProperties(props: Properties) {
+    localProperties.set(props)
+  }
+
   def initLocalProperties() {
     localProperties.set(new Properties())
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31929994/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index e6e35c9..870e12d 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -878,14 +878,21 @@ class SparkIMain(val settings: Settings, protected val out: PrintWriter) extends
           (message, false)
         }
       }
+
+      // Get a copy of the local properties from SparkContext, and set it later in the thread
+      // that triggers the execution. This is to make sure the caller of this function can pass
+      // the right thread local (inheritable) properties down into Spark.
+      val sc = org.apache.spark.repl.Main.interp.sparkContext
+      val props = if (sc != null) sc.getLocalProperties() else null
       
       try {
         val execution = lineManager.set(originalLine) {
           // MATEI: set the right SparkEnv for our SparkContext, because
           // this execution will happen in a separate thread
-          val sc = org.apache.spark.repl.Main.interp.sparkContext
-          if (sc != null && sc.env != null)
+          if (sc != null && sc.env != null) {
             SparkEnv.set(sc.env)
+            sc.setLocalProperties(props)
+          }
           // Execute the line
           lineRep call "$export"
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/31929994/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 8f9b632..6e4504d 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -21,12 +21,14 @@ import java.io._
 import java.net.URLClassLoader
 
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
 
-import org.scalatest.FunSuite
 import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.apache.spark.SparkContext
+
 
 class ReplSuite extends FunSuite {
+
   def runInterpreter(master: String, input: String): String = {
     val in = new BufferedReader(new StringReader(input + "\n"))
     val out = new StringWriter()
@@ -64,6 +66,35 @@ class ReplSuite extends FunSuite {
            "Interpreter output contained '" + message + "':\n" + output)
   }
 
+  test("propagation of local properties") {
+    // A mock ILoop that doesn't install the SIGINT handler.
+    class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) {
+      settings = new scala.tools.nsc.Settings
+      settings.usejavacp.value = true
+      org.apache.spark.repl.Main.interp = this
+      override def createInterpreter() {
+        intp = new SparkILoopInterpreter
+        intp.setContextClassLoader()
+      }
+    }
+
+    val out = new StringWriter()
+    val interp = new ILoop(new PrintWriter(out))
+    interp.sparkContext = new SparkContext("local", "repl-test")
+    interp.createInterpreter()
+    interp.intp.initialize()
+    interp.sparkContext.setLocalProperty("someKey", "someValue")
+
+    // Make sure the value we set in the caller to interpret is propagated in the thread that
+    // interprets the command.
+    interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")")
+    assert(out.toString.contains("someValue"))
+
+    interp.sparkContext.stop()
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+  }
+
   test ("simple foreach with accumulator") {
     val output = runInterpreter("local", """
       val accum = sc.accumulator(0)


[2/2] git commit: Merge pull request #152 from rxin/repl

Posted by ma...@apache.org.
Merge pull request #152 from rxin/repl

Propagate SparkContext local properties from spark-repl caller thread to the repl execution thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/72a601ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/72a601ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/72a601ec

Branch: refs/heads/master
Commit: 72a601ec318d017e5ec2b878abeac19e30ebb554
Parents: dd63c54 3192999
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sat Nov 9 11:55:16 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Sat Nov 9 11:55:16 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  6 ++++
 .../org/apache/spark/repl/SparkIMain.scala      | 11 ++++--
 .../scala/org/apache/spark/repl/ReplSuite.scala | 35 ++++++++++++++++++--
 3 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------