You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/12 06:36:57 UTC

[3/8] spark git commit: Support cross building for Scala 2.11

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
new file mode 100644
index 0000000..f966f25
--- /dev/null
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io._
+import java.net.URLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.tools.nsc.interpreter.SparkILoop
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
+
+
+
+class ReplSuite extends FunSuite {
+
+  def runInterpreter(master: String, input: String): String = {
+    val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
+
+    val in = new BufferedReader(new StringReader(input + "\n"))
+    val out = new StringWriter()
+    val cl = getClass.getClassLoader
+    var paths = new ArrayBuffer[String]
+    if (cl.isInstanceOf[URLClassLoader]) {
+      val urlLoader = cl.asInstanceOf[URLClassLoader]
+      for (url <- urlLoader.getURLs) {
+        if (url.getProtocol == "file") {
+          paths += url.getFile
+        }
+      }
+    }
+    val classpath = paths.mkString(File.pathSeparator)
+
+    val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
+    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
+
+    System.setProperty("spark.master", master)
+    val interp = {
+      new SparkILoop(in, new PrintWriter(out))
+    }
+    org.apache.spark.repl.Main.interp = interp
+    Main.s.processArguments(List("-classpath", classpath), true)
+    Main.main(Array()) // call main
+    org.apache.spark.repl.Main.interp = null
+
+    if (oldExecutorClasspath != null) {
+      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
+    } else {
+      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
+    }
+    return out.toString
+  }
+
+  def assertContains(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(isContain,
+      "Interpreter output did not contain '" + message + "':\n" + output)
+  }
+
+  def assertDoesNotContain(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(!isContain,
+      "Interpreter output contained '" + message + "':\n" + output)
+  }
+
+  test("propagation of local properties") {
+    // A mock ILoop that doesn't install the SIGINT handler.
+    class ILoop(out: PrintWriter) extends SparkILoop(None, out) {
+      settings = new scala.tools.nsc.Settings
+      settings.usejavacp.value = true
+      org.apache.spark.repl.Main.interp = this
+      override def createInterpreter() {
+        intp = new SparkILoopInterpreter
+        intp.setContextClassLoader()
+      }
+    }
+
+    val out = new StringWriter()
+    Main.interp = new ILoop(new PrintWriter(out))
+    Main.sparkContext = new SparkContext("local", "repl-test")
+    Main.interp.createInterpreter()
+
+    Main.sparkContext.setLocalProperty("someKey", "someValue")
+
+    // Make sure the value we set in the caller to interpret is propagated in the thread that
+    // interprets the command.
+    Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
+    assert(out.toString.contains("someValue"))
+
+    Main.sparkContext.stop()
+    System.clearProperty("spark.driver.port")
+  }
+
+  test("simple foreach with accumulator") {
+    val output = runInterpreter("local",
+      """
+        |val accum = sc.accumulator(0)
+        |sc.parallelize(1 to 10).foreach(x => accum += x)
+        |accum.value
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Int = 55", output)
+  }
+
+  test("external vars") {
+    val output = runInterpreter("local",
+      """
+        |var v = 7
+        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+        |v = 10
+        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 70", output)
+    assertContains("res1: Int = 100", output)
+  }
+
+  test("external classes") {
+    val output = runInterpreter("local",
+      """
+        |class C {
+        |def foo = 5
+        |}
+        |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 50", output)
+  }
+
+  test("external functions") {
+    val output = runInterpreter("local",
+      """
+        |def double(x: Int) = x + x
+        |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 110", output)
+  }
+
+  test("external functions that access vars") {
+    val output = runInterpreter("local",
+      """
+        |var v = 7
+        |def getV() = v
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |v = 10
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 70", output)
+    assertContains("res1: Int = 100", output)
+  }
+
+  test("broadcast vars") {
+    // Test that the value that a broadcast var had when it was created is used,
+    // even if that variable is then modified in the driver program
+    // TODO: This doesn't actually work for arrays when we run in local mode!
+    val output = runInterpreter("local",
+      """
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |array(0) = 5
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
+  }
+
+  test("interacting with files") {
+    val tempDir = Files.createTempDir()
+    tempDir.deleteOnExit()
+    val out = new FileWriter(tempDir + "/input")
+    out.write("Hello world!\n")
+    out.write("What's up?\n")
+    out.write("Goodbye\n")
+    out.close()
+    val output = runInterpreter("local",
+      """
+        |var file = sc.textFile("%s").cache()
+        |file.count()
+        |file.count()
+        |file.count()
+      """.stripMargin.format(StringEscapeUtils.escapeJava(
+        tempDir.getAbsolutePath + File.separator + "input")))
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Long = 3", output)
+    assertContains("res1: Long = 3", output)
+    assertContains("res2: Long = 3", output)
+    Utils.deleteRecursively(tempDir)
+  }
+
+  test("local-cluster mode") {
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |var v = 7
+        |def getV() = v
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |v = 10
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |array(0) = 5
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 70", output)
+    assertContains("res1: Int = 100", output)
+    assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+  }
+
+  test("SPARK-1199 two instances of same class don't type check.") {
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |case class Sum(exp: String, exp2: String)
+        |val a = Sum("A", "B")
+        |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" }
+        |b(a)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2452 compound statements.") {
+    val output = runInterpreter("local",
+      """
+        |val x = 4 ; def f() = x
+        |f()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2576 importing SQLContext.createSchemaRDD.") {
+    // We need to use local-cluster to test this case.
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+        |import sqlContext.createSchemaRDD
+        |case class TestCaseClass(value: Int)
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2632 importing a method from non serializable class and not using it.") {
+    val output = runInterpreter("local",
+    """
+      |class TestClass() { def testMethod = 3 }
+      |val t = new TestClass
+      |import t.testMethod
+      |case class TestCaseClass(value: Int)
+      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+    """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
+    test("running on Mesos") {
+      val output = runInterpreter("localquiet",
+        """
+          |var v = 7
+          |def getV() = v
+          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |v = 10
+          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |var array = new Array[Int](5)
+          |val broadcastArray = sc.broadcast(array)
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+          |array(0) = 5
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        """.stripMargin)
+      assertDoesNotContain("error:", output)
+      assertDoesNotContain("Exception", output)
+      assertContains("res0: Int = 70", output)
+      assertContains("res1: Int = 100", output)
+      assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+      assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    }
+  }
+
+  test("collecting objects of class defined in repl") {
+    val output = runInterpreter("local[2]",
+      """
+        |case class Foo(i: Int)
+        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("ret: Array[Foo] = Array(Foo(1),", output)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala
deleted file mode 100644
index 14b448d..0000000
--- a/repl/src/main/scala/org/apache/spark/repl/Main.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import scala.collection.mutable.Set
-
-object Main {
-  private var _interp: SparkILoop = _
-
-  def interp = _interp
-
-  def interp_=(i: SparkILoop) { _interp = i }
-
-  def main(args: Array[String]) {
-    _interp = new SparkILoop
-    _interp.process(args)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala b/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
deleted file mode 100644
index 0581694..0000000
--- a/repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import scala.tools.nsc.{Settings, CompilerCommand}
-import scala.Predef._
-
-/**
- * Command class enabling Spark-specific command line options (provided by
- * <i>org.apache.spark.repl.SparkRunnerSettings</i>).
- */
-class SparkCommandLine(args: List[String], override val settings: Settings)
-    extends CompilerCommand(args, settings) {
-
-  def this(args: List[String], error: String => Unit) {
-    this(args, new SparkRunnerSettings(error))
-  }
-
-  def this(args: List[String]) {
-    this(args, str => Console.println("Error: " + str))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
deleted file mode 100644
index f8432c8..0000000
--- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-// scalastyle:off
-
-/* NSC -- new Scala compiler
- * Copyright 2005-2013 LAMP/EPFL
- * @author  Paul Phillips
- */
-
-package org.apache.spark.repl
-
-import scala.tools.nsc._
-import scala.tools.nsc.interpreter._
-
-import scala.reflect.internal.util.BatchSourceFile
-import scala.tools.nsc.ast.parser.Tokens.EOF
-
-import org.apache.spark.Logging
-
-trait SparkExprTyper extends Logging {
-  val repl: SparkIMain
-
-  import repl._
-  import global.{ reporter => _, Import => _, _ }
-  import definitions._
-  import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name }
-  import naming.freshInternalVarName
-
-  object codeParser extends { val global: repl.global.type = repl.global } with CodeHandlers[Tree] {
-    def applyRule[T](code: String, rule: UnitParser => T): T = {
-      reporter.reset()
-      val scanner = newUnitParser(code)
-      val result  = rule(scanner)
-
-      if (!reporter.hasErrors)
-        scanner.accept(EOF)
-
-      result
-    }
-
-    def defns(code: String) = stmts(code) collect { case x: DefTree => x }
-    def expr(code: String)  = applyRule(code, _.expr())
-    def stmts(code: String) = applyRule(code, _.templateStats())
-    def stmt(code: String)  = stmts(code).last  // guaranteed nonempty
-  }
-
-  /** Parse a line into a sequence of trees. Returns None if the input is incomplete. */
-  def parse(line: String): Option[List[Tree]] = debugging(s"""parse("$line")""")  {
-    var isIncomplete = false
-    reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
-      val trees = codeParser.stmts(line)
-      if (reporter.hasErrors) {
-        Some(Nil)
-      } else if (isIncomplete) {
-        None
-      } else {
-        Some(trees)
-      }
-    }
-  }
-  // def parsesAsExpr(line: String) = {
-  //   import codeParser._
-  //   (opt expr line).isDefined
-  // }
-
-  def symbolOfLine(code: String): Symbol = {
-    def asExpr(): Symbol = {
-      val name  = freshInternalVarName()
-      // Typing it with a lazy val would give us the right type, but runs
-      // into compiler bugs with things like existentials, so we compile it
-      // behind a def and strip the NullaryMethodType which wraps the expr.
-      val line = "def " + name + " = {\n" + code + "\n}"
-
-      interpretSynthetic(line) match {
-        case IR.Success =>
-          val sym0 = symbolOfTerm(name)
-          // drop NullaryMethodType
-          val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType)
-          if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
-        case _          => NoSymbol
-      }
-    }
-    def asDefn(): Symbol = {
-      val old = repl.definedSymbolList.toSet
-
-      interpretSynthetic(code) match {
-        case IR.Success =>
-          repl.definedSymbolList filterNot old match {
-            case Nil        => NoSymbol
-            case sym :: Nil => sym
-            case syms       => NoSymbol.newOverloaded(NoPrefix, syms)
-          }
-        case _ => NoSymbol
-      }
-    }
-    beQuietDuring(asExpr()) orElse beQuietDuring(asDefn())
-  }
-
-  private var typeOfExpressionDepth = 0
-  def typeOfExpression(expr: String, silent: Boolean = true): Type = {
-    if (typeOfExpressionDepth > 2) {
-      logDebug("Terminating typeOfExpression recursion for expression: " + expr)
-      return NoType
-    }
-    typeOfExpressionDepth += 1
-    // Don't presently have a good way to suppress undesirable success output
-    // while letting errors through, so it is first trying it silently: if there
-    // is an error, and errors are desired, then it re-evaluates non-silently
-    // to induce the error message.
-    try beSilentDuring(symbolOfLine(expr).tpe) match {
-      case NoType if !silent => symbolOfLine(expr).tpe // generate error
-      case tpe               => tpe
-    }
-    finally typeOfExpressionDepth -= 1
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala
deleted file mode 100644
index 5340951..0000000
--- a/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package scala.tools.nsc
-
-object SparkHelper {
-  def explicitParentLoader(settings: Settings) = settings.explicitParentLoader
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
deleted file mode 100644
index e56b74e..0000000
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ /dev/null
@@ -1,1091 +0,0 @@
-// scalastyle:off
-
-/* NSC -- new Scala compiler
- * Copyright 2005-2013 LAMP/EPFL
- * @author Alexander Spoon
- */
-
-package org.apache.spark.repl
-
-
-import java.net.URL
-
-import scala.reflect.io.AbstractFile
-import scala.tools.nsc._
-import scala.tools.nsc.backend.JavaPlatform
-import scala.tools.nsc.interpreter._
-
-import scala.tools.nsc.interpreter.{Results => IR}
-import Predef.{println => _, _}
-import java.io.{BufferedReader, FileReader}
-import java.net.URI
-import java.util.concurrent.locks.ReentrantLock
-import scala.sys.process.Process
-import scala.tools.nsc.interpreter.session._
-import scala.util.Properties.{jdkHome, javaVersion}
-import scala.tools.util.{Javap}
-import scala.annotation.tailrec
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.ops
-import scala.tools.nsc.util._
-import scala.tools.nsc.interpreter._
-import scala.tools.nsc.io.{File, Directory}
-import scala.reflect.NameTransformer._
-import scala.tools.nsc.util.ScalaClassLoader._
-import scala.tools.util._
-import scala.language.{implicitConversions, existentials, postfixOps}
-import scala.reflect.{ClassTag, classTag}
-import scala.tools.reflect.StdRuntimeTags._
-
-import java.lang.{Class => jClass}
-import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse}
-
-import org.apache.spark.Logging
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext
-import org.apache.spark.util.Utils
-
-/** The Scala interactive shell.  It provides a read-eval-print loop
- *  around the Interpreter class.
- *  After instantiation, clients should call the main() method.
- *
- *  If no in0 is specified, then input will come from the console, and
- *  the class will attempt to provide input editing feature such as
- *  input history.
- *
- *  @author Moez A. Abdel-Gawad
- *  @author  Lex Spoon
- *  @version 1.2
- */
-class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
-               val master: Option[String])
-                extends AnyRef
-                   with LoopCommands
-                   with SparkILoopInit
-                   with Logging
-{
-  def this(in0: BufferedReader, out: JPrintWriter, master: String) = this(Some(in0), out, Some(master))
-  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None)
-  def this() = this(None, new JPrintWriter(Console.out, true), None)
-
-  var in: InteractiveReader = _   // the input stream from which commands come
-  var settings: Settings = _
-  var intp: SparkIMain = _
-
-  @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp
-  @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: SparkIMain): Unit = intp = i
-
-  /** Having inherited the difficult "var-ness" of the repl instance,
-   *  I'm trying to work around it by moving operations into a class from
-   *  which it will appear a stable prefix.
-   */
-  private def onIntp[T](f: SparkIMain => T): T = f(intp)
-
-  class IMainOps[T <: SparkIMain](val intp: T) {
-    import intp._
-    import global._
-
-    def printAfterTyper(msg: => String) =
-      intp.reporter printMessage afterTyper(msg)
-
-    /** Strip NullaryMethodType artifacts. */
-    private def replInfo(sym: Symbol) = {
-      sym.info match {
-        case NullaryMethodType(restpe) if sym.isAccessor  => restpe
-        case info                                         => info
-      }
-    }
-    def echoTypeStructure(sym: Symbol) =
-      printAfterTyper("" + deconstruct.show(replInfo(sym)))
-
-    def echoTypeSignature(sym: Symbol, verbose: Boolean) = {
-      if (verbose) SparkILoop.this.echo("// Type signature")
-      printAfterTyper("" + replInfo(sym))
-
-      if (verbose) {
-        SparkILoop.this.echo("\n// Internal Type structure")
-        echoTypeStructure(sym)
-      }
-    }
-  }
-  implicit def stabilizeIMain(intp: SparkIMain) = new IMainOps[intp.type](intp)
-
-  /** TODO -
-   *  -n normalize
-   *  -l label with case class parameter names
-   *  -c complete - leave nothing out
-   */
-  private def typeCommandInternal(expr: String, verbose: Boolean): Result = {
-    onIntp { intp =>
-      val sym = intp.symbolOfLine(expr)
-      if (sym.exists) intp.echoTypeSignature(sym, verbose)
-      else ""
-    }
-  }
-
-  var sparkContext: SparkContext = _
-
-  override def echoCommandMessage(msg: String) {
-    intp.reporter printMessage msg
-  }
-
-  // def isAsync = !settings.Yreplsync.value
-  def isAsync = false
-  // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, classTag[StdReplVals])
-  def history = in.history
-
-  /** The context class loader at the time this object was created */
-  protected val originalClassLoader = Utils.getContextOrSparkClassLoader
-
-  // classpath entries added via :cp
-  var addedClasspath: String = ""
-
-  /** A reverse list of commands to replay if the user requests a :replay */
-  var replayCommandStack: List[String] = Nil
-
-  /** A list of commands to replay if the user requests a :replay */
-  def replayCommands = replayCommandStack.reverse
-
-  /** Record a command for replay should the user request a :replay */
-  def addReplay(cmd: String) = replayCommandStack ::= cmd
-
-  def savingReplayStack[T](body: => T): T = {
-    val saved = replayCommandStack
-    try body
-    finally replayCommandStack = saved
-  }
-  def savingReader[T](body: => T): T = {
-    val saved = in
-    try body
-    finally in = saved
-  }
-
-
-  def sparkCleanUp(){
-    echo("Stopping spark context.")
-    intp.beQuietDuring {
-      command("sc.stop()")
-    }
-  }
-  /** Close the interpreter and set the var to null. */
-  def closeInterpreter() {
-    if (intp ne null) {
-      sparkCleanUp()
-      intp.close()
-      intp = null
-    }
-  }
-
-  class SparkILoopInterpreter extends SparkIMain(settings, out) {
-    outer =>
-
-    override lazy val formatting = new Formatting {
-      def prompt = SparkILoop.this.prompt
-    }
-    override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader)
-  }
-
-  /** Create a new interpreter. */
-  def createInterpreter() {
-    require(settings != null)
-
-    if (addedClasspath != "") settings.classpath.append(addedClasspath)
-    val addedJars =
-      if (Utils.isWindows) {
-        // Strip any URI scheme prefix so we can add the correct path to the classpath
-        // e.g. file:/C:/my/path.jar -> C:/my/path.jar
-        SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") }
-      } else {
-        SparkILoop.getAddedJars
-      }
-    // work around for Scala bug
-    val totalClassPath = addedJars.foldLeft(
-      settings.classpath.value)((l, r) => ClassPath.join(l, r))
-    this.settings.classpath.value = totalClassPath
-
-    intp = new SparkILoopInterpreter
-  }
-
-  /** print a friendly help message */
-  def helpCommand(line: String): Result = {
-    if (line == "") helpSummary()
-    else uniqueCommand(line) match {
-      case Some(lc) => echo("\n" + lc.longHelp)
-      case _        => ambiguousError(line)
-    }
-  }
-  private def helpSummary() = {
-    val usageWidth  = commands map (_.usageMsg.length) max
-    val formatStr   = "%-" + usageWidth + "s %s %s"
-
-    echo("All commands can be abbreviated, e.g. :he instead of :help.")
-    echo("Those marked with a * have more detailed help, e.g. :help imports.\n")
-
-    commands foreach { cmd =>
-      val star = if (cmd.hasLongHelp) "*" else " "
-      echo(formatStr.format(cmd.usageMsg, star, cmd.help))
-    }
-  }
-  private def ambiguousError(cmd: String): Result = {
-    matchingCommands(cmd) match {
-      case Nil  => echo(cmd + ": no such command.  Type :help for help.")
-      case xs   => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?")
-    }
-    Result(true, None)
-  }
-  private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd)
-  private def uniqueCommand(cmd: String): Option[LoopCommand] = {
-    // this lets us add commands willy-nilly and only requires enough command to disambiguate
-    matchingCommands(cmd) match {
-      case List(x)  => Some(x)
-      // exact match OK even if otherwise appears ambiguous
-      case xs       => xs find (_.name == cmd)
-    }
-  }
-  private var fallbackMode = false 
-
-  private def toggleFallbackMode() {
-    val old = fallbackMode
-    fallbackMode = !old
-    System.setProperty("spark.repl.fallback", fallbackMode.toString)
-    echo(s"""
-      |Switched ${if (old) "off" else "on"} fallback mode without restarting.
-      |       If you have defined classes in the repl, it would 
-      |be good to redefine them incase you plan to use them. If you still run
-      |into issues it would be good to restart the repl and turn on `:fallback` 
-      |mode as first command.
-      """.stripMargin)
-  }
-
-  /** Show the history */
-  lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") {
-    override def usage = "[num]"
-    def defaultLines = 20
-
-    def apply(line: String): Result = {
-      if (history eq NoHistory)
-        return "No history available."
-
-      val xs      = words(line)
-      val current = history.index
-      val count   = try xs.head.toInt catch { case _: Exception => defaultLines }
-      val lines   = history.asStrings takeRight count
-      val offset  = current - lines.size + 1
-
-      for ((line, index) <- lines.zipWithIndex)
-        echo("%3d  %s".format(index + offset, line))
-    }
-  }
-
-  // When you know you are most likely breaking into the middle
-  // of a line being typed.  This softens the blow.
-  protected def echoAndRefresh(msg: String) = {
-    echo("\n" + msg)
-    in.redrawLine()
-  }
-  protected def echo(msg: String) = {
-    out println msg
-    out.flush()
-  }
-  protected def echoNoNL(msg: String) = {
-    out print msg
-    out.flush()
-  }
-
-  /** Search the history */
-  def searchHistory(_cmdline: String) {
-    val cmdline = _cmdline.toLowerCase
-    val offset  = history.index - history.size + 1
-
-    for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline)
-      echo("%d %s".format(index + offset, line))
-  }
-
-  private var currentPrompt = Properties.shellPromptString
-  def setPrompt(prompt: String) = currentPrompt = prompt
-  /** Prompt to print when awaiting input */
-  def prompt = currentPrompt
-
-  import LoopCommand.{ cmd, nullary }
-
-  /** Standard commands */
-  lazy val standardCommands = List(
-    cmd("cp", "<path>", "add a jar or directory to the classpath", addClasspath),
-    cmd("help", "[command]", "print this summary or command-specific help", helpCommand),
-    historyCommand,
-    cmd("h?", "<string>", "search the history", searchHistory),
-    cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand),
-    cmd("implicits", "[-v]", "show the implicits in scope", implicitsCommand),
-    cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand),
-    cmd("load", "<path>", "load and interpret a Scala file", loadCommand),
-    nullary("paste", "enter paste mode: all input up to ctrl-D compiled together", pasteCommand),
-//    nullary("power", "enable power user mode", powerCmd),
-    nullary("quit", "exit the repl", () => Result(false, None)),
-    nullary("replay", "reset execution and replay all previous commands", replay),
-    nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand),
-    shCommand,
-    nullary("silent", "disable/enable automatic printing of results", verbosity),
-    nullary("fallback", """
-                           |disable/enable advanced repl changes, these fix some issues but may introduce others. 
-                           |This mode will be removed once these fixes stablize""".stripMargin, toggleFallbackMode),
-    cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
-    nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
-  )
-
-  /** Power user commands */
-  lazy val powerCommands: List[LoopCommand] = List(
-    // cmd("phase", "<phase>", "set the implicit phase for power commands", phaseCommand)
-  )
-
-  // private def dumpCommand(): Result = {
-  //   echo("" + power)
-  //   history.asStrings takeRight 30 foreach echo
-  //   in.redrawLine()
-  // }
-  // private def valsCommand(): Result = power.valsDescription
-
-  private val typeTransforms = List(
-    "scala.collection.immutable." -> "immutable.",
-    "scala.collection.mutable."   -> "mutable.",
-    "scala.collection.generic."   -> "generic.",
-    "java.lang."                  -> "jl.",
-    "scala.runtime."              -> "runtime."
-  )
-
-  private def importsCommand(line: String): Result = {
-    val tokens    = words(line)
-    val handlers  = intp.languageWildcardHandlers ++ intp.importHandlers
-    val isVerbose = tokens contains "-v"
-
-    handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach {
-      case (handler, idx) =>
-        val (types, terms) = handler.importedSymbols partition (_.name.isTypeName)
-        val imps           = handler.implicitSymbols
-        val found          = tokens filter (handler importsSymbolNamed _)
-        val typeMsg        = if (types.isEmpty) "" else types.size + " types"
-        val termMsg        = if (terms.isEmpty) "" else terms.size + " terms"
-        val implicitMsg    = if (imps.isEmpty) "" else imps.size + " are implicit"
-        val foundMsg       = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "")
-        val statsMsg       = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")")
-
-        intp.reporter.printMessage("%2d) %-30s %s%s".format(
-          idx + 1,
-          handler.importString,
-          statsMsg,
-          foundMsg
-        ))
-    }
-  }
-
-  private def implicitsCommand(line: String): Result = onIntp { intp =>
-    import intp._
-    import global._
-
-    def p(x: Any) = intp.reporter.printMessage("" + x)
-
-    // If an argument is given, only show a source with that
-    // in its name somewhere.
-    val args     = line split "\\s+"
-    val filtered = intp.implicitSymbolsBySource filter {
-      case (source, syms) =>
-        (args contains "-v") || {
-          if (line == "") (source.fullName.toString != "scala.Predef")
-          else (args exists (source.name.toString contains _))
-        }
-    }
-
-    if (filtered.isEmpty)
-      return "No implicits have been imported other than those in Predef."
-
-    filtered foreach {
-      case (source, syms) =>
-        p("/* " + syms.size + " implicit members imported from " + source.fullName + " */")
-
-        // This groups the members by where the symbol is defined
-        val byOwner = syms groupBy (_.owner)
-        val sortedOwners = byOwner.toList sortBy { case (owner, _) => afterTyper(source.info.baseClasses indexOf owner) }
-
-        sortedOwners foreach {
-          case (owner, members) =>
-            // Within each owner, we cluster results based on the final result type
-            // if there are more than a couple, and sort each cluster based on name.
-            // This is really just trying to make the 100 or so implicits imported
-            // by default into something readable.
-            val memberGroups: List[List[Symbol]] = {
-              val groups = members groupBy (_.tpe.finalResultType) toList
-              val (big, small) = groups partition (_._2.size > 3)
-              val xss = (
-                (big sortBy (_._1.toString) map (_._2)) :+
-                (small flatMap (_._2))
-              )
-
-              xss map (xs => xs sortBy (_.name.toString))
-            }
-
-            val ownerMessage = if (owner == source) " defined in " else " inherited from "
-            p("  /* " + members.size + ownerMessage + owner.fullName + " */")
-
-            memberGroups foreach { group =>
-              group foreach (s => p("  " + intp.symbolDefString(s)))
-              p("")
-            }
-        }
-        p("")
-    }
-  }
-
-  private def findToolsJar() = {
-    val jdkPath = Directory(jdkHome)
-    val jar     = jdkPath / "lib" / "tools.jar" toFile;
-
-    if (jar isFile)
-      Some(jar)
-    else if (jdkPath.isDirectory)
-      jdkPath.deepFiles find (_.name == "tools.jar")
-    else None
-  }
-  private def addToolsJarToLoader() = {
-    val cl = findToolsJar match {
-      case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), intp.classLoader)
-      case _           => intp.classLoader
-    }
-    if (Javap.isAvailable(cl)) {
-      logDebug(":javap available.")
-      cl
-    }
-    else {
-      logDebug(":javap unavailable: no tools.jar at " + jdkHome)
-      intp.classLoader
-    }
-  }
-
-  protected def newJavap() = new JavapClass(addToolsJarToLoader(), new SparkIMain.ReplStrippingWriter(intp)) {
-    override def tryClass(path: String): Array[Byte] = {
-      val hd :: rest = path split '.' toList;
-      // If there are dots in the name, the first segment is the
-      // key to finding it.
-      if (rest.nonEmpty) {
-        intp optFlatName hd match {
-          case Some(flat) =>
-            val clazz = flat :: rest mkString NAME_JOIN_STRING
-            val bytes = super.tryClass(clazz)
-            if (bytes.nonEmpty) bytes
-            else super.tryClass(clazz + MODULE_SUFFIX_STRING)
-          case _          => super.tryClass(path)
-        }
-      }
-      else {
-        // Look for Foo first, then Foo$, but if Foo$ is given explicitly,
-        // we have to drop the $ to find object Foo, then tack it back onto
-        // the end of the flattened name.
-        def className  = intp flatName path
-        def moduleName = (intp flatName path.stripSuffix(MODULE_SUFFIX_STRING)) + MODULE_SUFFIX_STRING
-
-        val bytes = super.tryClass(className)
-        if (bytes.nonEmpty) bytes
-        else super.tryClass(moduleName)
-      }
-    }
-  }
-  // private lazy val javap = substituteAndLog[Javap]("javap", NoJavap)(newJavap())
-  private lazy val javap =
-    try newJavap()
-    catch { case _: Exception => null }
-
-  // Still todo: modules.
-  private def typeCommand(line0: String): Result = {
-    line0.trim match {
-      case ""                      => ":type [-v] <expression>"
-      case s if s startsWith "-v " => typeCommandInternal(s stripPrefix "-v " trim, true)
-      case s                       => typeCommandInternal(s, false)
-    }
-  }
-
-  private def warningsCommand(): Result = {
-    if (intp.lastWarnings.isEmpty)
-      "Can't find any cached warnings."
-    else
-      intp.lastWarnings foreach { case (pos, msg) => intp.reporter.warning(pos, msg) }
-  }
-
-  private def javapCommand(line: String): Result = {
-    if (javap == null)
-      ":javap unavailable, no tools.jar at %s.  Set JDK_HOME.".format(jdkHome)
-    else if (javaVersion startsWith "1.7")
-      ":javap not yet working with java 1.7"
-    else if (line == "")
-      ":javap [-lcsvp] [path1 path2 ...]"
-    else
-      javap(words(line)) foreach { res =>
-        if (res.isError) return "Failed: " + res.value
-        else res.show()
-      }
-  }
-
-  private def wrapCommand(line: String): Result = {
-    def failMsg = "Argument to :wrap must be the name of a method with signature [T](=> T): T"
-    onIntp { intp =>
-      import intp._
-      import global._
-
-      words(line) match {
-        case Nil            =>
-          intp.executionWrapper match {
-            case ""   => "No execution wrapper is set."
-            case s    => "Current execution wrapper: " + s
-          }
-        case "clear" :: Nil =>
-          intp.executionWrapper match {
-            case ""   => "No execution wrapper is set."
-            case s    => intp.clearExecutionWrapper() ; "Cleared execution wrapper."
-          }
-        case wrapper :: Nil =>
-          intp.typeOfExpression(wrapper) match {
-            case PolyType(List(targ), MethodType(List(arg), restpe)) =>
-              intp setExecutionWrapper intp.pathToTerm(wrapper)
-              "Set wrapper to '" + wrapper + "'"
-            case tp =>
-              failMsg + "\nFound: <unknown>"
-          }
-        case _ => failMsg
-      }
-    }
-  }
-
-  private def pathToPhaseWrapper = intp.pathToTerm("$r") + ".phased.atCurrent"
-  // private def phaseCommand(name: String): Result = {
-  //   val phased: Phased = power.phased
-  //   import phased.NoPhaseName
-
-  //   if (name == "clear") {
-  //     phased.set(NoPhaseName)
-  //     intp.clearExecutionWrapper()
-  //     "Cleared active phase."
-  //   }
-  //   else if (name == "") phased.get match {
-  //     case NoPhaseName => "Usage: :phase <expr> (e.g. typer, erasure.next, erasure+3)"
-  //     case ph          => "Active phase is '%s'.  (To clear, :phase clear)".format(phased.get)
-  //   }
-  //   else {
-  //     val what = phased.parse(name)
-  //     if (what.isEmpty || !phased.set(what))
-  //       "'" + name + "' does not appear to represent a valid phase."
-  //     else {
-  //       intp.setExecutionWrapper(pathToPhaseWrapper)
-  //       val activeMessage =
-  //         if (what.toString.length == name.length) "" + what
-  //         else "%s (%s)".format(what, name)
-
-  //       "Active phase is now: " + activeMessage
-  //     }
-  //   }
-  // }
-
-  /** Available commands */
-  def commands: List[LoopCommand] = standardCommands /*++ (
-    if (isReplPower) powerCommands else Nil
-  )*/
-
-  private val replayQuestionMessage =
-    """|That entry seems to have slain the compiler.  Shall I replay
-       |your session? I can re-run each line except the last one.
-       |[y/n]
-    """.trim.stripMargin
-
-  private def crashRecovery(ex: Throwable): Boolean = {
-    echo(ex.toString)
-    ex match {
-      case _: NoSuchMethodError | _: NoClassDefFoundError =>
-        echo("\nUnrecoverable error.")
-        throw ex
-      case _  =>
-        def fn(): Boolean =
-          try in.readYesOrNo(replayQuestionMessage, { echo("\nYou must enter y or n.") ; fn() })
-          catch { case _: RuntimeException => false }
-
-        if (fn()) replay()
-        else echo("\nAbandoning crashed session.")
-    }
-    true
-  }
-
-  /** The main read-eval-print loop for the repl.  It calls
-   *  command() for each line of input, and stops when
-   *  command() returns false.
-   */
-  def loop() {
-    def readOneLine() = {
-      out.flush()
-      in readLine prompt
-    }
-    // return false if repl should exit
-    def processLine(line: String): Boolean = {
-      if (isAsync) {
-        if (!awaitInitialized()) return false
-        runThunks()
-      }
-      if (line eq null) false               // assume null means EOF
-      else command(line) match {
-        case Result(false, _)           => false
-        case Result(_, Some(finalLine)) => addReplay(finalLine) ; true
-        case _                          => true
-      }
-    }
-    def innerLoop() {
-      val shouldContinue = try {
-        processLine(readOneLine())
-      } catch {case t: Throwable => crashRecovery(t)}
-      if (shouldContinue)
-        innerLoop()
-    }
-    innerLoop()
-  }
-
-  /** interpret all lines from a specified file */
-  def interpretAllFrom(file: File) {
-    savingReader {
-      savingReplayStack {
-        file applyReader { reader =>
-          in = SimpleReader(reader, out, false)
-          echo("Loading " + file + "...")
-          loop()
-        }
-      }
-    }
-  }
-
-  /** create a new interpreter and replay the given commands */
-  def replay() {
-    reset()
-    if (replayCommandStack.isEmpty)
-      echo("Nothing to replay.")
-    else for (cmd <- replayCommands) {
-      echo("Replaying: " + cmd)  // flush because maybe cmd will have its own output
-      command(cmd)
-      echo("")
-    }
-  }
-  def resetCommand() {
-    echo("Resetting repl state.")
-    if (replayCommandStack.nonEmpty) {
-      echo("Forgetting this session history:\n")
-      replayCommands foreach echo
-      echo("")
-      replayCommandStack = Nil
-    }
-    if (intp.namedDefinedTerms.nonEmpty)
-      echo("Forgetting all expression results and named terms: " + intp.namedDefinedTerms.mkString(", "))
-    if (intp.definedTypes.nonEmpty)
-      echo("Forgetting defined types: " + intp.definedTypes.mkString(", "))
-
-    reset()
-  }
-
-  def reset() {
-    intp.reset()
-    // unleashAndSetPhase()
-  }
-
-  /** fork a shell and run a command */
-  lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") {
-    override def usage = "<command line>"
-    def apply(line: String): Result = line match {
-      case ""   => showUsage()
-      case _    =>
-        val toRun = classOf[ProcessResult].getName + "(" + string2codeQuoted(line) + ")"
-        intp interpret toRun
-        ()
-    }
-  }
-
-  def withFile(filename: String)(action: File => Unit) {
-    val f = File(filename)
-
-    if (f.exists) action(f)
-    else echo("That file does not exist")
-  }
-
-  def loadCommand(arg: String) = {
-    var shouldReplay: Option[String] = None
-    withFile(arg)(f => {
-      interpretAllFrom(f)
-      shouldReplay = Some(":load " + arg)
-    })
-    Result(true, shouldReplay)
-  }
-
-  def addAllClasspath(args: Seq[String]): Unit = {
-    var added = false
-    var totalClasspath = ""
-    for (arg <- args) {
-      val f = File(arg).normalize
-      if (f.exists) {
-        added = true
-        addedClasspath = ClassPath.join(addedClasspath, f.path)
-        totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath)
-        intp.addUrlsToClassPath(f.toURI.toURL)
-        sparkContext.addJar(f.toURI.toURL.getPath)
-      }
-    }
-  }
-
-  def addClasspath(arg: String): Unit = {
-    val f = File(arg).normalize
-    if (f.exists) {
-      addedClasspath = ClassPath.join(addedClasspath, f.path)
-      intp.addUrlsToClassPath(f.toURI.toURL)
-      sparkContext.addJar(f.toURI.toURL.getPath)
-      echo("Added '%s'.  Your new classpath is:\n\"%s\"".format(f.path, intp.global.classPath.asClasspathString))
-    }
-    else echo("The path '" + f + "' doesn't seem to exist.")
-  }
-
-
-  def powerCmd(): Result = {
-    if (isReplPower) "Already in power mode."
-    else enablePowerMode(false)
-  }
-
-  def enablePowerMode(isDuringInit: Boolean) = {
-    // replProps.power setValue true
-    // unleashAndSetPhase()
-    // asyncEcho(isDuringInit, power.banner)
-  }
-  // private def unleashAndSetPhase() {
-//     if (isReplPower) {
-// //      power.unleash()
-//       // Set the phase to "typer"
-//       intp beSilentDuring phaseCommand("typer")
-//     }
-//   }
-
-  def asyncEcho(async: Boolean, msg: => String) {
-    if (async) asyncMessage(msg)
-    else echo(msg)
-  }
-
-  def verbosity() = {
-    // val old = intp.printResults
-    // intp.printResults = !old
-    // echo("Switched " + (if (old) "off" else "on") + " result printing.")
-  }
-
-  /** Run one command submitted by the user.  Two values are returned:
-    * (1) whether to keep running, (2) the line to record for replay,
-    * if any. */
-  def command(line: String): Result = {
-    if (line startsWith ":") {
-      val cmd = line.tail takeWhile (x => !x.isWhitespace)
-      uniqueCommand(cmd) match {
-        case Some(lc) => lc(line.tail stripPrefix cmd dropWhile (_.isWhitespace))
-        case _        => ambiguousError(cmd)
-      }
-    }
-    else if (intp.global == null) Result(false, None)  // Notice failure to create compiler
-    else Result(true, interpretStartingWith(line))
-  }
-
-  private def readWhile(cond: String => Boolean) = {
-    Iterator continually in.readLine("") takeWhile (x => x != null && cond(x))
-  }
-
-  def pasteCommand(): Result = {
-    echo("// Entering paste mode (ctrl-D to finish)\n")
-    val code = readWhile(_ => true) mkString "\n"
-    echo("\n// Exiting paste mode, now interpreting.\n")
-    intp interpret code
-    ()
-  }
-
-  private object paste extends Pasted {
-    val ContinueString = "     | "
-    val PromptString   = "scala> "
-
-    def interpret(line: String): Unit = {
-      echo(line.trim)
-      intp interpret line
-      echo("")
-    }
-
-    def transcript(start: String) = {
-      echo("\n// Detected repl transcript paste: ctrl-D to finish.\n")
-      apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim))
-    }
-  }
-  import paste.{ ContinueString, PromptString }
-
-  /** Interpret expressions starting with the first line.
-    * Read lines until a complete compilation unit is available
-    * or until a syntax error has been seen.  If a full unit is
-    * read, go ahead and interpret it.  Return the full string
-    * to be recorded for replay, if any.
-    */
-  def interpretStartingWith(code: String): Option[String] = {
-    // signal completion non-completion input has been received
-    in.completion.resetVerbosity()
-
-    def reallyInterpret = {
-      val reallyResult = intp.interpret(code)
-      (reallyResult, reallyResult match {
-        case IR.Error       => None
-        case IR.Success     => Some(code)
-        case IR.Incomplete  =>
-          if (in.interactive && code.endsWith("\n\n")) {
-            echo("You typed two blank lines.  Starting a new command.")
-            None
-          }
-          else in.readLine(ContinueString) match {
-            case null =>
-              // we know compilation is going to fail since we're at EOF and the
-              // parser thinks the input is still incomplete, but since this is
-              // a file being read non-interactively we want to fail.  So we send
-              // it straight to the compiler for the nice error message.
-              intp.compileString(code)
-              None
-
-            case line => interpretStartingWith(code + "\n" + line)
-          }
-      })
-    }
-
-    /** Here we place ourselves between the user and the interpreter and examine
-     *  the input they are ostensibly submitting.  We intervene in several cases:
-     *
-     *  1) If the line starts with "scala> " it is assumed to be an interpreter paste.
-     *  2) If the line starts with "." (but not ".." or "./") it is treated as an invocation
-     *     on the previous result.
-     *  3) If the Completion object's execute returns Some(_), we inject that value
-     *     and avoid the interpreter, as it's likely not valid scala code.
-     */
-    if (code == "") None
-    else if (!paste.running && code.trim.startsWith(PromptString)) {
-      paste.transcript(code)
-      None
-    }
-    else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") {
-      interpretStartingWith(intp.mostRecentVar + code)
-    }
-    else if (code.trim startsWith "//") {
-      // line comment, do nothing
-      None
-    }
-    else
-      reallyInterpret._2
-  }
-
-  // runs :load `file` on any files passed via -i
-  def loadFiles(settings: Settings) = settings match {
-    case settings: SparkRunnerSettings =>
-      for (filename <- settings.loadfiles.value) {
-        val cmd = ":load " + filename
-        command(cmd)
-        addReplay(cmd)
-        echo("")
-      }
-    case _ =>
-  }
-
-  /** Tries to create a JLineReader, falling back to SimpleReader:
-   *  unless settings or properties are such that it should start
-   *  with SimpleReader.
-   */
-  def chooseReader(settings: Settings): InteractiveReader = {
-    if (settings.Xnojline.value || Properties.isEmacsShell)
-      SimpleReader()
-    else try new SparkJLineReader(
-      if (settings.noCompletion.value) NoCompletion
-      else new SparkJLineCompletion(intp)
-    )
-    catch {
-      case ex @ (_: Exception | _: NoClassDefFoundError) =>
-        echo("Failed to created SparkJLineReader: " + ex + "\nFalling back to SimpleReader.")
-        SimpleReader()
-    }
-  }
-
-  val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
-  val m = u.runtimeMirror(Utils.getSparkClassLoader)
-  private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] =
-    u.TypeTag[T](
-      m,
-      new TypeCreator {
-        def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type =
-          m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type]
-      })
-
-  def process(settings: Settings): Boolean = savingContextLoader {
-    if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
-
-    this.settings = settings
-    createInterpreter()
-
-    // sets in to some kind of reader depending on environmental cues
-    in = in0 match {
-      case Some(reader) => SimpleReader(reader, out, true)
-      case None         =>
-        // some post-initialization
-        chooseReader(settings) match {
-          case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x
-          case x                   => x
-        }
-    }
-    lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain]
-    // Bind intp somewhere out of the regular namespace where
-    // we can get at it in generated code.
-    addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain])))
-    addThunk({
-      import scala.tools.nsc.io._
-      import Properties.userHome
-      import scala.compat.Platform.EOL
-      val autorun = replProps.replAutorunCode.option flatMap (f => io.File(f).safeSlurp())
-      if (autorun.isDefined) intp.quietRun(autorun.get)
-    })
-
-    addThunk(printWelcome())
-    addThunk(initializeSpark())
-
-    // it is broken on startup; go ahead and exit
-    if (intp.reporter.hasErrors)
-      return false
-
-    // This is about the illusion of snappiness.  We call initialize()
-    // which spins off a separate thread, then print the prompt and try
-    // our best to look ready.  The interlocking lazy vals tend to
-    // inter-deadlock, so we break the cycle with a single asynchronous
-    // message to an actor.
-    if (isAsync) {
-      intp initialize initializedCallback()
-      createAsyncListener() // listens for signal to run postInitialization
-    }
-    else {
-      intp.initializeSynchronous()
-      postInitialization()
-    }
-    // printWelcome()
-
-    loadFiles(settings)
-
-    try loop()
-    catch AbstractOrMissingHandler()
-    finally closeInterpreter()
-
-    true
-  }
-
-  def createSparkContext(): SparkContext = {
-    val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    val jars = SparkILoop.getAddedJars
-    val conf = new SparkConf()
-      .setMaster(getMaster())
-      .setAppName("Spark shell")
-      .setJars(jars)
-      .set("spark.repl.class.uri", intp.classServer.uri)
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri)
-    }
-    sparkContext = new SparkContext(conf)
-    logInfo("Created spark context..")
-    sparkContext
-  }
-
-  private def getMaster(): String = {
-    val master = this.master match {
-      case Some(m) => m
-      case None =>
-        val envMaster = sys.env.get("MASTER")
-        val propMaster = sys.props.get("spark.master")
-        propMaster.orElse(envMaster).getOrElse("local[*]")
-    }
-    master
-  }
-
-  /** process command-line arguments and do as they request */
-  def process(args: Array[String]): Boolean = {
-    val command = new SparkCommandLine(args.toList, msg => echo(msg))
-    def neededHelp(): String =
-      (if (command.settings.help.value) command.usageMsg + "\n" else "") +
-      (if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "")
-
-    // if they asked for no help and command is valid, we call the real main
-    neededHelp() match {
-      case ""     => command.ok && process(command.settings)
-      case help   => echoNoNL(help) ; true
-    }
-  }
-
-  @deprecated("Use `process` instead", "2.9.0")
-  def main(settings: Settings): Unit = process(settings)
-}
-
-object SparkILoop {
-  implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
-  private def echo(msg: String) = Console println msg
-
-  def getAddedJars: Array[String] = {
-    val envJars = sys.env.get("ADD_JARS")
-    val propJars = sys.props.get("spark.jars").flatMap { p =>
-      if (p == "") None else Some(p)
-    }
-    val jars = propJars.orElse(envJars).getOrElse("")
-    Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
-  }
-
-  // Designed primarily for use by test code: take a String with a
-  // bunch of code, and prints out a transcript of what it would look
-  // like if you'd just typed it into the repl.
-  def runForTranscript(code: String, settings: Settings): String = {
-    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
-
-    stringFromStream { ostream =>
-      Console.withOut(ostream) {
-        val output = new JPrintWriter(new OutputStreamWriter(ostream), true) {
-          override def write(str: String) = {
-            // completely skip continuation lines
-            if (str forall (ch => ch.isWhitespace || ch == '|')) ()
-            // print a newline on empty scala prompts
-            else if ((str contains '\n') && (str.trim == "scala> ")) super.write("\n")
-            else super.write(str)
-          }
-        }
-        val input = new BufferedReader(new StringReader(code)) {
-          override def readLine(): String = {
-            val s = super.readLine()
-            // helping out by printing the line being interpreted.
-            if (s != null)
-              output.println(s)
-            s
-          }
-        }
-        val repl = new SparkILoop(input, output)
-
-        if (settings.classpath.isDefault)
-          settings.classpath.value = sys.props("java.class.path")
-
-        getAddedJars.foreach(settings.classpath.append(_))
-
-        repl process settings
-      }
-    }
-  }
-
-  /** Creates an interpreter loop with default settings and feeds
-   *  the given code to it as input.
-   */
-  def run(code: String, sets: Settings = new Settings): String = {
-    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
-
-    stringFromStream { ostream =>
-      Console.withOut(ostream) {
-        val input    = new BufferedReader(new StringReader(code))
-        val output   = new JPrintWriter(new OutputStreamWriter(ostream), true)
-        val repl     = new ILoop(input, output)
-
-        if (sets.classpath.isDefault)
-          sets.classpath.value = sys.props("java.class.path")
-
-        repl process sets
-      }
-    }
-  }
-  def run(lines: List[String]): String = run(lines map (_ + "\n") mkString)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/daaca14c/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
deleted file mode 100644
index 7667a9c..0000000
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-// scalastyle:off
-
-/* NSC -- new Scala compiler
- * Copyright 2005-2013 LAMP/EPFL
- * @author Paul Phillips
- */
-
-package org.apache.spark.repl
-
-import scala.tools.nsc._
-import scala.tools.nsc.interpreter._
-
-import scala.reflect.internal.util.Position
-import scala.util.control.Exception.ignoring
-import scala.tools.nsc.util.stackTraceString
-
-import org.apache.spark.SPARK_VERSION
-
-/**
- *  Machinery for the asynchronous initialization of the repl.
- */
-trait SparkILoopInit {
-  self: SparkILoop =>
-
-  /** Print a welcome message */
-  def printWelcome() {
-    echo("""Welcome to
-      ____              __
-     / __/__  ___ _____/ /__
-    _\ \/ _ \/ _ `/ __/  '_/
-   /___/ .__/\_,_/_/ /_/\_\   version %s
-      /_/
-""".format(SPARK_VERSION))
-    import Properties._
-    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
-      versionString, javaVmName, javaVersion)
-    echo(welcomeMsg)
-    echo("Type in expressions to have them evaluated.")
-    echo("Type :help for more information.")
-   }
-
-  protected def asyncMessage(msg: String) {
-    if (isReplInfo || isReplPower)
-      echoAndRefresh(msg)
-  }
-
-  private val initLock = new java.util.concurrent.locks.ReentrantLock()
-  private val initCompilerCondition = initLock.newCondition() // signal the compiler is initialized
-  private val initLoopCondition = initLock.newCondition()     // signal the whole repl is initialized
-  private val initStart = System.nanoTime
-
-  private def withLock[T](body: => T): T = {
-    initLock.lock()
-    try body
-    finally initLock.unlock()
-  }
-  // a condition used to ensure serial access to the compiler.
-  @volatile private var initIsComplete = false
-  @volatile private var initError: String = null
-  private def elapsed() = "%.3f".format((System.nanoTime - initStart).toDouble / 1000000000L)
-
-  // the method to be called when the interpreter is initialized.
-  // Very important this method does nothing synchronous (i.e. do
-  // not try to use the interpreter) because until it returns, the
-  // repl's lazy val `global` is still locked.
-  protected def initializedCallback() = withLock(initCompilerCondition.signal())
-
-  // Spins off a thread which awaits a single message once the interpreter
-  // has been initialized.
-  protected def createAsyncListener() = {
-    io.spawn {
-      withLock(initCompilerCondition.await())
-      asyncMessage("[info] compiler init time: " + elapsed() + " s.")
-      postInitialization()
-    }
-  }
-
-  // called from main repl loop
-  protected def awaitInitialized(): Boolean = {
-    if (!initIsComplete)
-      withLock { while (!initIsComplete) initLoopCondition.await() }
-    if (initError != null) {
-      println("""
-        |Failed to initialize the REPL due to an unexpected error.
-        |This is a bug, please, report it along with the error diagnostics printed below.
-        |%s.""".stripMargin.format(initError)
-      )
-      false
-    } else true
-  }
-  // private def warningsThunks = List(
-  //   () => intp.bind("lastWarnings", "" + typeTag[List[(Position, String)]], intp.lastWarnings _),
-  // )
-
-  protected def postInitThunks = List[Option[() => Unit]](
-    Some(intp.setContextClassLoader _),
-    if (isReplPower) Some(() => enablePowerMode(true)) else None
-  ).flatten
-  // ++ (
-  //   warningsThunks
-  // )
-  // called once after init condition is signalled
-  protected def postInitialization() {
-    try {
-      postInitThunks foreach (f => addThunk(f()))
-      runThunks()
-    } catch {
-      case ex: Throwable =>
-        initError = stackTraceString(ex)
-        throw ex
-    } finally {
-      initIsComplete = true
-
-      if (isAsync) {
-        asyncMessage("[info] total init time: " + elapsed() + " s.")
-        withLock(initLoopCondition.signal())
-      }
-    }
-  }
-
-  def initializeSpark() {
-    intp.beQuietDuring {
-      command("""
-         @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext();
-        """)
-      command("import org.apache.spark.SparkContext._")
-    }
-    echo("Spark context available as sc.")
-  }
-
-  // code to be executed only after the interpreter is initialized
-  // and the lazy val `global` can be accessed without risk of deadlock.
-  private var pendingThunks: List[() => Unit] = Nil
-  protected def addThunk(body: => Unit) = synchronized {
-    pendingThunks :+= (() => body)
-  }
-  protected def runThunks(): Unit = synchronized {
-    if (pendingThunks.nonEmpty)
-      logDebug("Clearing " + pendingThunks.size + " thunks.")
-
-    while (pendingThunks.nonEmpty) {
-      val thunk = pendingThunks.head
-      pendingThunks = pendingThunks.tail
-      thunk()
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org