You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/07 20:31:30 UTC

[08/20] spark git commit: Scala 2.11 support with repl and all build changes.

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
new file mode 100644
index 0000000..13cd2b7
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala
@@ -0,0 +1,232 @@
+// scalastyle:off
+
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author  Martin Odersky
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc._
+import scala.tools.nsc.interpreter._
+
+import scala.collection.{ mutable, immutable }
+import scala.PartialFunction.cond
+import scala.reflect.internal.Chars
+import scala.reflect.internal.Flags._
+import scala.language.implicitConversions
+
+trait SparkMemberHandlers {
+  val intp: SparkIMain
+
+  import intp.{ Request, global, naming }
+  import global._
+  import naming._
+
+  private def codegenln(leadingPlus: Boolean, xs: String*): String = codegen(leadingPlus, (xs ++ Array("\n")): _*)
+  private def codegenln(xs: String*): String = codegenln(true, xs: _*)
+
+  private def codegen(xs: String*): String = codegen(true, xs: _*)
+  private def codegen(leadingPlus: Boolean, xs: String*): String = {
+    val front = if (leadingPlus) "+ " else ""
+    front + (xs map string2codeQuoted mkString " + ")
+  }
+  private implicit def name2string(name: Name) = name.toString
+
+  /** A traverser that finds all mentioned identifiers, i.e. things
+   *  that need to be imported.  It might return extra names.
+   */
+  private class ImportVarsTraverser extends Traverser {
+    val importVars = new mutable.HashSet[Name]()
+
+    override def traverse(ast: Tree) = ast match {
+      case Ident(name) =>
+        // XXX this is obviously inadequate but it's going to require some effort
+        // to get right.
+        if (name.toString startsWith "x$") ()
+        else importVars += name
+      case _        => super.traverse(ast)
+    }
+  }
+  private object ImportVarsTraverser {
+    def apply(member: Tree) = {
+      val ivt = new ImportVarsTraverser()
+      ivt traverse member
+      ivt.importVars.toList
+    }
+  }
+
+  def chooseHandler(member: Tree): MemberHandler = member match {
+    case member: DefDef        => new DefHandler(member)
+    case member: ValDef        => new ValHandler(member)
+    case member: Assign        => new AssignHandler(member)
+    case member: ModuleDef     => new ModuleHandler(member)
+    case member: ClassDef      => new ClassHandler(member)
+    case member: TypeDef       => new TypeAliasHandler(member)
+    case member: Import        => new ImportHandler(member)
+    case DocDef(_, documented) => chooseHandler(documented)
+    case member                => new GenericHandler(member)
+  }
+
+  sealed abstract class MemberDefHandler(override val member: MemberDef) extends MemberHandler(member) {
+    def symbol          = if (member.symbol eq null) NoSymbol else member.symbol
+    def name: Name      = member.name
+    def mods: Modifiers = member.mods
+    def keyword         = member.keyword
+    def prettyName      = name.decode
+
+    override def definesImplicit = member.mods.isImplicit
+    override def definesTerm: Option[TermName] = Some(name.toTermName) filter (_ => name.isTermName)
+    override def definesType: Option[TypeName] = Some(name.toTypeName) filter (_ => name.isTypeName)
+    override def definedSymbols = if (symbol eq NoSymbol) Nil else List(symbol)
+  }
+
+  /** Class to handle one member among all the members included
+   *  in a single interpreter request.
+   */
+  sealed abstract class MemberHandler(val member: Tree) {
+    def definesImplicit = false
+    def definesValue    = false
+    def isLegalTopLevel = false
+
+    def definesTerm     = Option.empty[TermName]
+    def definesType     = Option.empty[TypeName]
+
+    lazy val referencedNames = ImportVarsTraverser(member)
+    def importedNames        = List[Name]()
+    def definedNames         = definesTerm.toList ++ definesType.toList
+    def definedOrImported    = definedNames ++ importedNames
+    def definedSymbols       = List[Symbol]()
+
+    def extraCodeToEvaluate(req: Request): String = ""
+    def resultExtractionCode(req: Request): String = ""
+
+    private def shortName = this.getClass.toString split '.' last
+    override def toString = shortName + referencedNames.mkString(" (refs: ", ", ", ")")
+  }
+
+  class GenericHandler(member: Tree) extends MemberHandler(member)
+
+  class ValHandler(member: ValDef) extends MemberDefHandler(member) {
+    val maxStringElements = 1000  // no need to mkString billions of elements
+    override def definesValue = true
+
+    override def resultExtractionCode(req: Request): String = {
+      val isInternal = isUserVarName(name) && req.lookupTypeOf(name) == "Unit"
+      if (!mods.isPublic || isInternal) ""
+      else {
+        // if this is a lazy val we avoid evaluating it here
+        val resultString =
+          if (mods.isLazy) codegenln(false, "<lazy>")
+          else any2stringOf(req fullPath name, maxStringElements)
+
+        val vidString =
+          if (replProps.vids) """" + " @ " + "%%8x".format(System.identityHashCode(%s)) + " """.trim.format(req fullPath name)
+          else ""
+
+        """ + "%s%s: %s = " + %s""".format(string2code(prettyName), vidString, string2code(req typeOf name), resultString)
+      }
+    }
+  }
+
+  class DefHandler(member: DefDef) extends MemberDefHandler(member) {
+    private def vparamss = member.vparamss
+    private def isMacro = member.symbol hasFlag MACRO
+    // true if not a macro and 0-arity
+    override def definesValue = !isMacro && flattensToEmpty(vparamss)
+    override def resultExtractionCode(req: Request) =
+      if (mods.isPublic) codegenln(name, ": ", req.typeOf(name)) else ""
+  }
+
+  class AssignHandler(member: Assign) extends MemberHandler(member) {
+    val Assign(lhs, rhs) = member
+    val name = newTermName(freshInternalVarName())
+
+    override def definesTerm = Some(name)
+    override def definesValue = true
+    override def extraCodeToEvaluate(req: Request) =
+      """val %s = %s""".format(name, lhs)
+
+    /** Print out lhs instead of the generated varName */
+    override def resultExtractionCode(req: Request) = {
+      val lhsType = string2code(req lookupTypeOf name)
+      val res     = string2code(req fullPath name)
+      """ + "%s: %s = " + %s + "\n" """.format(string2code(lhs.toString), lhsType, res) + "\n"
+    }
+  }
+
+  class ModuleHandler(module: ModuleDef) extends MemberDefHandler(module) {
+    override def definesTerm = Some(name)
+    override def definesValue = true
+    override def isLegalTopLevel = true
+
+    override def resultExtractionCode(req: Request) = codegenln("defined module ", name)
+  }
+
+  class ClassHandler(member: ClassDef) extends MemberDefHandler(member) {
+    override def definesType = Some(name.toTypeName)
+    override def definesTerm = Some(name.toTermName) filter (_ => mods.isCase)
+    override def isLegalTopLevel = true
+
+    override def resultExtractionCode(req: Request) =
+      codegenln("defined %s %s".format(keyword, name))
+  }
+
+  class TypeAliasHandler(member: TypeDef) extends MemberDefHandler(member) {
+    private def isAlias = mods.isPublic && treeInfo.isAliasTypeDef(member)
+    override def definesType = Some(name.toTypeName) filter (_ => isAlias)
+
+    override def resultExtractionCode(req: Request) =
+      codegenln("defined type alias ", name) + "\n"
+  }
+
+  class ImportHandler(imp: Import) extends MemberHandler(imp) {
+    val Import(expr, selectors) = imp
+    def targetType: Type = intp.typeOfExpression("" + expr)
+    override def isLegalTopLevel = true
+
+    def createImportForName(name: Name): String = {
+      selectors foreach {
+        case sel @ ImportSelector(old, _, `name`, _)  => return "import %s.{ %s }".format(expr, sel)
+        case _ => ()
+      }
+      "import %s.%s".format(expr, name)
+    }
+    // TODO: Need to track these specially to honor Predef masking attempts,
+    // because they must be the leading imports in the code generated for each
+    // line.  We can use the same machinery as Contexts now, anyway.
+    def isPredefImport = isReferenceToPredef(expr)
+
+    // wildcard imports, e.g. import foo._
+    private def selectorWild    = selectors filter (_.name == nme.USCOREkw)
+    // renamed imports, e.g. import foo.{ bar => baz }
+    private def selectorRenames = selectors map (_.rename) filterNot (_ == null)
+
+    /** Whether this import includes a wildcard import */
+    val importsWildcard = selectorWild.nonEmpty
+
+    /** Whether anything imported is implicit .*/
+    def importsImplicit = implicitSymbols.nonEmpty
+
+    def implicitSymbols = importedSymbols filter (_.isImplicit)
+    def importedSymbols = individualSymbols ++ wildcardSymbols
+
+    lazy val individualSymbols: List[Symbol] =
+      beforePickler(individualNames map (targetType nonPrivateMember _))
+
+    lazy val wildcardSymbols: List[Symbol] =
+      if (importsWildcard) beforePickler(targetType.nonPrivateMembers.toList)
+      else Nil
+
+    /** Complete list of names imported by a wildcard */
+    lazy val wildcardNames: List[Name]   = wildcardSymbols map (_.name)
+    lazy val individualNames: List[Name] = selectorRenames filterNot (_ == nme.USCOREkw) flatMap (_.bothNames)
+
+    /** The names imported by this statement */
+    override lazy val importedNames: List[Name] = wildcardNames ++ individualNames
+    lazy val importsSymbolNamed: Set[String] = importedNames map (_.toString) toSet
+
+    def importString = imp.toString
+    override def resultExtractionCode(req: Request) = codegenln(importString) + "\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala
new file mode 100644
index 0000000..7fd5fbb
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc.Settings
+
+/**
+ * <i>scala.tools.nsc.Settings</i> implementation adding Spark-specific REPL
+ * command line options.
+ */
+class SparkRunnerSettings(error: String => Unit) extends Settings(error){
+
+  val loadfiles = MultiStringSetting(
+      "-i",
+      "file",
+      "load a file (assumes the code is given interactively)")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
new file mode 100644
index 0000000..91c9c52
--- /dev/null
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io._
+import java.net.URLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+import org.apache.spark.SparkContext
+import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.util.Utils
+
+
+class ReplSuite extends FunSuite {
+
+  def runInterpreter(master: String, input: String): String = {
+    val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
+
+    val in = new BufferedReader(new StringReader(input + "\n"))
+    val out = new StringWriter()
+    val cl = getClass.getClassLoader
+    var paths = new ArrayBuffer[String]
+    if (cl.isInstanceOf[URLClassLoader]) {
+      val urlLoader = cl.asInstanceOf[URLClassLoader]
+      for (url <- urlLoader.getURLs) {
+        if (url.getProtocol == "file") {
+          paths += url.getFile
+        }
+      }
+    }
+    val classpath = paths.mkString(File.pathSeparator)
+
+    val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
+    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
+
+    val interp = new SparkILoop(in, new PrintWriter(out), master)
+    org.apache.spark.repl.Main.interp = interp
+    interp.process(Array("-classpath", classpath))
+    org.apache.spark.repl.Main.interp = null
+    if (interp.sparkContext != null) {
+      interp.sparkContext.stop()
+    }
+    if (oldExecutorClasspath != null) {
+      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
+    } else {
+      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
+    }
+    return out.toString
+  }
+
+  def assertContains(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(isContain,
+      "Interpreter output did not contain '" + message + "':\n" + output)
+  }
+
+  def assertDoesNotContain(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(!isContain,
+      "Interpreter output contained '" + message + "':\n" + output)
+  }
+
+  test("propagation of local properties") {
+    // A mock ILoop that doesn't install the SIGINT handler.
+    class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) {
+      settings = new scala.tools.nsc.Settings
+      settings.usejavacp.value = true
+      org.apache.spark.repl.Main.interp = this
+      override def createInterpreter() {
+        intp = new SparkILoopInterpreter
+        intp.setContextClassLoader()
+      }
+    }
+
+    val out = new StringWriter()
+    val interp = new ILoop(new PrintWriter(out))
+    interp.sparkContext = new SparkContext("local", "repl-test")
+    interp.createInterpreter()
+    interp.intp.initialize()
+    interp.sparkContext.setLocalProperty("someKey", "someValue")
+
+    // Make sure the value we set in the caller to interpret is propagated in the thread that
+    // interprets the command.
+    interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")")
+    assert(out.toString.contains("someValue"))
+
+    interp.sparkContext.stop()
+    System.clearProperty("spark.driver.port")
+  }
+
+  test("simple foreach with accumulator") {
+    val output = runInterpreter("local",
+      """
+        |val accum = sc.accumulator(0)
+        |sc.parallelize(1 to 10).foreach(x => accum += x)
+        |accum.value
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Int = 55", output)
+  }
+
+  test("external vars") {
+    val output = runInterpreter("local",
+      """
+        |var v = 7
+        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+        |v = 10
+        |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 70", output)
+    assertContains("res1: Int = 100", output)
+  }
+
+  test("external classes") {
+    val output = runInterpreter("local",
+      """
+        |class C {
+        |def foo = 5
+        |}
+        |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 50", output)
+  }
+
+  test("external functions") {
+    val output = runInterpreter("local",
+      """
+        |def double(x: Int) = x + x
+        |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 110", output)
+  }
+
+  test("external functions that access vars") {
+    val output = runInterpreter("local",
+      """
+        |var v = 7
+        |def getV() = v
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |v = 10
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 70", output)
+    assertContains("res1: Int = 100", output)
+  }
+
+  test("broadcast vars") {
+    // Test that the value that a broadcast var had when it was created is used,
+    // even if that variable is then modified in the driver program
+    // TODO: This doesn't actually work for arrays when we run in local mode!
+    val output = runInterpreter("local",
+      """
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |array(0) = 5
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
+  }
+
+  test("interacting with files") {
+    val tempDir = Utils.createTempDir()
+    val out = new FileWriter(tempDir + "/input")
+    out.write("Hello world!\n")
+    out.write("What's up?\n")
+    out.write("Goodbye\n")
+    out.close()
+    val output = runInterpreter("local",
+      """
+        |var file = sc.textFile("%s").cache()
+        |file.count()
+        |file.count()
+        |file.count()
+      """.stripMargin.format(StringEscapeUtils.escapeJava(
+        tempDir.getAbsolutePath + File.separator + "input")))
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Long = 3", output)
+    assertContains("res1: Long = 3", output)
+    assertContains("res2: Long = 3", output)
+    Utils.deleteRecursively(tempDir)
+  }
+
+  test("local-cluster mode") {
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |var v = 7
+        |def getV() = v
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |v = 10
+        |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        |array(0) = 5
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Int = 70", output)
+    assertContains("res1: Int = 100", output)
+    assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+  }
+
+  test("SPARK-1199 two instances of same class don't type check.") {
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |case class Sum(exp: String, exp2: String)
+        |val a = Sum("A", "B")
+        |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" }
+        |b(a)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2452 compound statements.") {
+    val output = runInterpreter("local",
+      """
+        |val x = 4 ; def f() = x
+        |f()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2576 importing SQLContext.createSchemaRDD.") {
+    // We need to use local-cluster to test this case.
+    val output = runInterpreter("local-cluster[1,1,512]",
+      """
+        |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+        |import sqlContext.createSchemaRDD
+        |case class TestCaseClass(value: Int)
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2632 importing a method from non serializable class and not using it.") {
+    val output = runInterpreter("local",
+    """
+      |class TestClass() { def testMethod = 3 }
+      |val t = new TestClass
+      |import t.testMethod
+      |case class TestCaseClass(value: Int)
+      |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect
+    """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
+    test("running on Mesos") {
+      val output = runInterpreter("localquiet",
+        """
+          |var v = 7
+          |def getV() = v
+          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |v = 10
+          |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
+          |var array = new Array[Int](5)
+          |val broadcastArray = sc.broadcast(array)
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+          |array(0) = 5
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
+        """.stripMargin)
+      assertDoesNotContain("error:", output)
+      assertDoesNotContain("Exception", output)
+      assertContains("res0: Int = 70", output)
+      assertContains("res1: Int = 100", output)
+      assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+      assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    }
+  }
+
+  test("collecting objects of class defined in repl") {
+    val output = runInterpreter("local[2]",
+      """
+        |case class Foo(i: Int)
+        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("ret: Array[Foo] = Array(Foo(1),", output)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
new file mode 100644
index 0000000..5e93a71
--- /dev/null
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import org.apache.spark.util.Utils
+import org.apache.spark._
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.SparkILoop
+
+object Main extends Logging {
+
+  val conf = new SparkConf()
+  val tmp = System.getProperty("java.io.tmpdir")
+  val rootDir = conf.get("spark.repl.classdir", tmp)
+  val outputDir = Utils.createTempDir(rootDir)
+  val s = new Settings()
+  s.processArguments(List("-Yrepl-class-based",
+    "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true)
+  val classServer = new HttpServer(outputDir, new SecurityManager(conf))
+  var sparkContext: SparkContext = _
+  var interp = new SparkILoop // this is a public var because tests reset it.
+
+  def main(args: Array[String]) {
+    if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+    // Start the classServer and store its URI in a spark system property
+    // (which will be passed to executors so that they can connect to it)
+    classServer.start()
+    interp.process(s) // Repl starts and goes in loop of R.E.P.L
+    classServer.stop()
+    Option(sparkContext).map(_.stop)
+  }
+
+
+  def getAddedJars: Array[String] = {
+    val envJars = sys.env.get("ADD_JARS")
+    val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
+    val jars = propJars.orElse(envJars).getOrElse("")
+    Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
+  }
+
+  def createSparkContext(): SparkContext = {
+    val execUri = System.getenv("SPARK_EXECUTOR_URI")
+    val jars = getAddedJars
+    val conf = new SparkConf()
+      .setMaster(getMaster)
+      .setAppName("Spark shell")
+      .setJars(jars)
+      .set("spark.repl.class.uri", classServer.uri)
+    logInfo("Spark class server started at " + classServer.uri)
+    if (execUri != null) {
+      conf.set("spark.executor.uri", execUri)
+    }
+    if (System.getenv("SPARK_HOME") != null) {
+      conf.setSparkHome(System.getenv("SPARK_HOME"))
+    }
+    sparkContext = new SparkContext(conf)
+    logInfo("Created spark context..")
+    sparkContext
+  }
+
+  private def getMaster: String = {
+    val master = {
+      val envMaster = sys.env.get("MASTER")
+      val propMaster = sys.props.get("spark.master")
+      propMaster.orElse(envMaster).getOrElse("local[*]")
+    }
+    master
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
new file mode 100644
index 0000000..8e519fa
--- /dev/null
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -0,0 +1,86 @@
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author  Paul Phillips
+ */
+
+package scala.tools.nsc
+package interpreter
+
+import scala.tools.nsc.ast.parser.Tokens.EOF
+
+trait SparkExprTyper {
+  val repl: SparkIMain
+
+  import repl._
+  import global.{ reporter => _, Import => _, _ }
+  import naming.freshInternalVarName
+
+  def symbolOfLine(code: String): Symbol = {
+    def asExpr(): Symbol = {
+      val name  = freshInternalVarName()
+      // Typing it with a lazy val would give us the right type, but runs
+      // into compiler bugs with things like existentials, so we compile it
+      // behind a def and strip the NullaryMethodType which wraps the expr.
+      val line = "def " + name + " = " + code
+
+      interpretSynthetic(line) match {
+        case IR.Success =>
+          val sym0 = symbolOfTerm(name)
+          // drop NullaryMethodType
+          sym0.cloneSymbol setInfo exitingTyper(sym0.tpe_*.finalResultType)
+        case _          => NoSymbol
+      }
+    }
+    def asDefn(): Symbol = {
+      val old = repl.definedSymbolList.toSet
+
+      interpretSynthetic(code) match {
+        case IR.Success =>
+          repl.definedSymbolList filterNot old match {
+            case Nil        => NoSymbol
+            case sym :: Nil => sym
+            case syms       => NoSymbol.newOverloaded(NoPrefix, syms)
+          }
+        case _ => NoSymbol
+      }
+    }
+    def asError(): Symbol = {
+      interpretSynthetic(code)
+      NoSymbol
+    }
+    beSilentDuring(asExpr()) orElse beSilentDuring(asDefn()) orElse asError()
+  }
+
+  private var typeOfExpressionDepth = 0
+  def typeOfExpression(expr: String, silent: Boolean = true): Type = {
+    if (typeOfExpressionDepth > 2) {
+      repldbg("Terminating typeOfExpression recursion for expression: " + expr)
+      return NoType
+    }
+    typeOfExpressionDepth += 1
+    // Don't presently have a good way to suppress undesirable success output
+    // while letting errors through, so it is first trying it silently: if there
+    // is an error, and errors are desired, then it re-evaluates non-silently
+    // to induce the error message.
+    try beSilentDuring(symbolOfLine(expr).tpe) match {
+      case NoType if !silent => symbolOfLine(expr).tpe // generate error
+      case tpe               => tpe
+    }
+    finally typeOfExpressionDepth -= 1
+  }
+
+  // This only works for proper types.
+  def typeOfTypeString(typeString: String): Type = {
+    def asProperType(): Option[Type] = {
+      val name = freshInternalVarName()
+      val line = "def %s: %s = ???" format (name, typeString)
+      interpretSynthetic(line) match {
+        case IR.Success =>
+          val sym0 = symbolOfTerm(name)
+          Some(sym0.asMethod.returnType)
+        case _          => None
+      }
+    }
+    beSilentDuring(asProperType()) getOrElse NoType
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
new file mode 100644
index 0000000..a591e9f
--- /dev/null
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -0,0 +1,966 @@
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author Alexander Spoon
+ */
+
+package scala
+package tools.nsc
+package interpreter
+
+import scala.language.{ implicitConversions, existentials }
+import scala.annotation.tailrec
+import Predef.{ println => _, _ }
+import interpreter.session._
+import StdReplTags._
+import scala.reflect.api.{Mirror, Universe, TypeCreator}
+import scala.util.Properties.{ jdkHome, javaVersion, versionString, javaVmName }
+import scala.tools.nsc.util.{ ClassPath, Exceptional, stringFromWriter, stringFromStream }
+import scala.reflect.{ClassTag, classTag}
+import scala.reflect.internal.util.{ BatchSourceFile, ScalaClassLoader }
+import ScalaClassLoader._
+import scala.reflect.io.{ File, Directory }
+import scala.tools.util._
+import scala.collection.generic.Clearable
+import scala.concurrent.{ ExecutionContext, Await, Future, future }
+import ExecutionContext.Implicits._
+import java.io.{ BufferedReader, FileReader }
+
+/** The Scala interactive shell.  It provides a read-eval-print loop
+  *  around the Interpreter class.
+  *  After instantiation, clients should call the main() method.
+  *
+  *  If no in0 is specified, then input will come from the console, and
+  *  the class will attempt to provide input editing feature such as
+  *  input history.
+  *
+  *  @author Moez A. Abdel-Gawad
+  *  @author  Lex Spoon
+  *  @version 1.2
+  */
+class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter)
+  extends AnyRef
+  with LoopCommands
+{
+  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
+  def this() = this(None, new JPrintWriter(Console.out, true))
+//
+//  @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp
+//  @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: Interpreter): Unit = intp = i
+
+  var in: InteractiveReader = _   // the input stream from which commands come
+  var settings: Settings = _
+  var intp: SparkIMain = _
+
+  var globalFuture: Future[Boolean] = _
+
+  protected def asyncMessage(msg: String) {
+    if (isReplInfo || isReplPower)
+      echoAndRefresh(msg)
+  }
+
+  def initializeSpark() {
+    intp.beQuietDuring {
+      command( """
+         @transient val sc = org.apache.spark.repl.Main.createSparkContext();
+               """)
+      command("import org.apache.spark.SparkContext._")
+    }
+    echo("Spark context available as sc.")
+  }
+
+  /** Print a welcome message */
+  def printWelcome() {
+    import org.apache.spark.SPARK_VERSION
+    echo("""Welcome to
+      ____              __
+     / __/__  ___ _____/ /__
+    _\ \/ _ \/ _ `/ __/  '_/
+   /___/ .__/\_,_/_/ /_/\_\   version %s
+      /_/
+         """.format(SPARK_VERSION))
+    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
+      versionString, javaVmName, javaVersion)
+    echo(welcomeMsg)
+    echo("Type in expressions to have them evaluated.")
+    echo("Type :help for more information.")
+  }
+
+  override def echoCommandMessage(msg: String) {
+    intp.reporter printUntruncatedMessage msg
+  }
+
+  // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, classTag[StdReplVals])
+  def history = in.history
+
+  // classpath entries added via :cp
+  var addedClasspath: String = ""
+
+  /** A reverse list of commands to replay if the user requests a :replay */
+  var replayCommandStack: List[String] = Nil
+
+  /** A list of commands to replay if the user requests a :replay */
+  def replayCommands = replayCommandStack.reverse
+
+  /** Record a command for replay should the user request a :replay */
+  def addReplay(cmd: String) = replayCommandStack ::= cmd
+
+  def savingReplayStack[T](body: => T): T = {
+    val saved = replayCommandStack
+    try body
+    finally replayCommandStack = saved
+  }
+  def savingReader[T](body: => T): T = {
+    val saved = in
+    try body
+    finally in = saved
+  }
+
+  /** Close the interpreter and set the var to null. */
+  def closeInterpreter() {
+    if (intp ne null) {
+      intp.close()
+      intp = null
+    }
+  }
+
+  class SparkILoopInterpreter extends SparkIMain(settings, out) {
+    outer =>
+
+    override lazy val formatting = new Formatting {
+      def prompt = SparkILoop.this.prompt
+    }
+    override protected def parentClassLoader =
+      settings.explicitParentLoader.getOrElse( classOf[SparkILoop].getClassLoader )
+  }
+
+  /** Create a new interpreter. */
+  def createInterpreter() {
+    if (addedClasspath != "")
+      settings.classpath append addedClasspath
+
+    intp = new SparkILoopInterpreter
+  }
+
+  /** print a friendly help message */
+  def helpCommand(line: String): Result = {
+    if (line == "") helpSummary()
+    else uniqueCommand(line) match {
+      case Some(lc) => echo("\n" + lc.help)
+      case _        => ambiguousError(line)
+    }
+  }
+  private def helpSummary() = {
+    val usageWidth  = commands map (_.usageMsg.length) max
+    val formatStr   = "%-" + usageWidth + "s %s"
+
+    echo("All commands can be abbreviated, e.g. :he instead of :help.")
+
+    commands foreach { cmd =>
+      echo(formatStr.format(cmd.usageMsg, cmd.help))
+    }
+  }
+  private def ambiguousError(cmd: String): Result = {
+    matchingCommands(cmd) match {
+      case Nil  => echo(cmd + ": no such command.  Type :help for help.")
+      case xs   => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?")
+    }
+    Result(keepRunning = true, None)
+  }
+  private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd)
+  private def uniqueCommand(cmd: String): Option[LoopCommand] = {
+    // this lets us add commands willy-nilly and only requires enough command to disambiguate
+    matchingCommands(cmd) match {
+      case List(x)  => Some(x)
+      // exact match OK even if otherwise appears ambiguous
+      case xs       => xs find (_.name == cmd)
+    }
+  }
+
+  /** Show the history */
+  lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") {
+    override def usage = "[num]"
+    def defaultLines = 20
+
+    def apply(line: String): Result = {
+      if (history eq NoHistory)
+        return "No history available."
+
+      val xs      = words(line)
+      val current = history.index
+      val count   = try xs.head.toInt catch { case _: Exception => defaultLines }
+      val lines   = history.asStrings takeRight count
+      val offset  = current - lines.size + 1
+
+      for ((line, index) <- lines.zipWithIndex)
+        echo("%3d  %s".format(index + offset, line))
+    }
+  }
+
+  // When you know you are most likely breaking into the middle
+  // of a line being typed.  This softens the blow.
+  protected def echoAndRefresh(msg: String) = {
+    echo("\n" + msg)
+    in.redrawLine()
+  }
+  protected def echo(msg: String) = {
+    out println msg
+    out.flush()
+  }
+
+  /** Search the history */
+  def searchHistory(_cmdline: String) {
+    val cmdline = _cmdline.toLowerCase
+    val offset  = history.index - history.size + 1
+
+    for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline)
+      echo("%d %s".format(index + offset, line))
+  }
+
+  private val currentPrompt = Properties.shellPromptString
+
+  /** Prompt to print when awaiting input */
+  def prompt = currentPrompt
+
+  import LoopCommand.{ cmd, nullary }
+
+  /** Standard commands **/
+  lazy val standardCommands = List(
+    cmd("cp", "<path>", "add a jar or directory to the classpath", addClasspath),
+    cmd("edit", "<id>|<line>", "edit history", editCommand),
+    cmd("help", "[command]", "print this summary or command-specific help", helpCommand),
+    historyCommand,
+    cmd("h?", "<string>", "search the history", searchHistory),
+    cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand),
+    //cmd("implicits", "[-v]", "show the implicits in scope", intp.implicitsCommand),
+    cmd("javap", "<path|class>", "disassemble a file or class name", javapCommand),
+    cmd("line", "<id>|<line>", "place line(s) at the end of history", lineCommand),
+    cmd("load", "<path>", "interpret lines in a file", loadCommand),
+    cmd("paste", "[-raw] [path]", "enter paste mode or paste a file", pasteCommand),
+    // nullary("power", "enable power user mode", powerCmd),
+    nullary("quit", "exit the interpreter", () => Result(keepRunning = false, None)),
+    nullary("replay", "reset execution and replay all previous commands", replay),
+    nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand),
+    cmd("save", "<path>", "save replayable session to a file", saveCommand),
+    shCommand,
+    cmd("settings", "[+|-]<options>", "+enable/-disable flags, set compiler options", changeSettings),
+    nullary("silent", "disable/enable automatic printing of results", verbosity),
+//    cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
+//    cmd("kind", "[-v] <expr>", "display the kind of expression's type", kindCommand),
+    nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
+  )
+
+  /** Power user commands */
+//  lazy val powerCommands: List[LoopCommand] = List(
+//    cmd("phase", "<phase>", "set the implicit phase for power commands", phaseCommand)
+//  )
+
+  private def importsCommand(line: String): Result = {
+    val tokens    = words(line)
+    val handlers  = intp.languageWildcardHandlers ++ intp.importHandlers
+
+    handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach {
+      case (handler, idx) =>
+        val (types, terms) = handler.importedSymbols partition (_.name.isTypeName)
+        val imps           = handler.implicitSymbols
+        val found          = tokens filter (handler importsSymbolNamed _)
+        val typeMsg        = if (types.isEmpty) "" else types.size + " types"
+        val termMsg        = if (terms.isEmpty) "" else terms.size + " terms"
+        val implicitMsg    = if (imps.isEmpty) "" else imps.size + " are implicit"
+        val foundMsg       = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "")
+        val statsMsg       = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")")
+
+        intp.reporter.printMessage("%2d) %-30s %s%s".format(
+          idx + 1,
+          handler.importString,
+          statsMsg,
+          foundMsg
+        ))
+    }
+  }
+
+  private def findToolsJar() = PathResolver.SupplementalLocations.platformTools
+
+  private def addToolsJarToLoader() = {
+    val cl = findToolsJar() match {
+      case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), intp.classLoader)
+      case _           => intp.classLoader
+    }
+    if (Javap.isAvailable(cl)) {
+      repldbg(":javap available.")
+      cl
+    }
+    else {
+      repldbg(":javap unavailable: no tools.jar at " + jdkHome)
+      intp.classLoader
+    }
+  }
+//
+//  protected def newJavap() =
+//    JavapClass(addToolsJarToLoader(), new IMain.ReplStrippingWriter(intp), Some(intp))
+//
+//  private lazy val javap = substituteAndLog[Javap]("javap", NoJavap)(newJavap())
+
+  // Still todo: modules.
+//  private def typeCommand(line0: String): Result = {
+//    line0.trim match {
+//      case "" => ":type [-v] <expression>"
+//      case s  => intp.typeCommandInternal(s stripPrefix "-v " trim, verbose = s startsWith "-v ")
+//    }
+//  }
+
+//  private def kindCommand(expr: String): Result = {
+//    expr.trim match {
+//      case "" => ":kind [-v] <expression>"
+//      case s  => intp.kindCommandInternal(s stripPrefix "-v " trim, verbose = s startsWith "-v ")
+//    }
+//  }
+
+  private def warningsCommand(): Result = {
+    if (intp.lastWarnings.isEmpty)
+      "Can't find any cached warnings."
+    else
+      intp.lastWarnings foreach { case (pos, msg) => intp.reporter.warning(pos, msg) }
+  }
+
+  private def changeSettings(args: String): Result = {
+    def showSettings() = {
+      for (s <- settings.userSetSettings.toSeq.sorted) echo(s.toString)
+    }
+    def updateSettings() = {
+      // put aside +flag options
+      val (pluses, rest) = (args split "\\s+").toList partition (_.startsWith("+"))
+      val tmps = new Settings
+      val (ok, leftover) = tmps.processArguments(rest, processAll = true)
+      if (!ok) echo("Bad settings request.")
+      else if (leftover.nonEmpty) echo("Unprocessed settings.")
+      else {
+        // boolean flags set-by-user on tmp copy should be off, not on
+        val offs = tmps.userSetSettings filter (_.isInstanceOf[Settings#BooleanSetting])
+        val (minuses, nonbools) = rest partition (arg => offs exists (_ respondsTo arg))
+        // update non-flags
+        settings.processArguments(nonbools, processAll = true)
+        // also snag multi-value options for clearing, e.g. -Ylog: and -language:
+        for {
+          s <- settings.userSetSettings
+          if s.isInstanceOf[Settings#MultiStringSetting] || s.isInstanceOf[Settings#PhasesSetting]
+          if nonbools exists (arg => arg.head == '-' && arg.last == ':' && (s respondsTo arg.init))
+        } s match {
+          case c: Clearable => c.clear()
+          case _ =>
+        }
+        def update(bs: Seq[String], name: String=>String, setter: Settings#Setting=>Unit) = {
+          for (b <- bs)
+            settings.lookupSetting(name(b)) match {
+              case Some(s) =>
+                if (s.isInstanceOf[Settings#BooleanSetting]) setter(s)
+                else echo(s"Not a boolean flag: $b")
+              case _ =>
+                echo(s"Not an option: $b")
+            }
+        }
+        update(minuses, identity, _.tryToSetFromPropertyValue("false"))  // turn off
+        update(pluses, "-" + _.drop(1), _.tryToSet(Nil))                 // turn on
+      }
+    }
+    if (args.isEmpty) showSettings() else updateSettings()
+  }
+
+  private def javapCommand(line: String): Result = {
+//    if (javap == null)
+//      ":javap unavailable, no tools.jar at %s.  Set JDK_HOME.".format(jdkHome)
+//    else if (line == "")
+//      ":javap [-lcsvp] [path1 path2 ...]"
+//    else
+//      javap(words(line)) foreach { res =>
+//        if (res.isError) return "Failed: " + res.value
+//        else res.show()
+//      }
+  }
+
+  private def pathToPhaseWrapper = intp.originalPath("$r") + ".phased.atCurrent"
+
+  private def phaseCommand(name: String): Result = {
+//    val phased: Phased = power.phased
+//    import phased.NoPhaseName
+//
+//    if (name == "clear") {
+//      phased.set(NoPhaseName)
+//      intp.clearExecutionWrapper()
+//      "Cleared active phase."
+//    }
+//    else if (name == "") phased.get match {
+//      case NoPhaseName => "Usage: :phase <expr> (e.g. typer, erasure.next, erasure+3)"
+//      case ph          => "Active phase is '%s'.  (To clear, :phase clear)".format(phased.get)
+//    }
+//    else {
+//      val what = phased.parse(name)
+//      if (what.isEmpty || !phased.set(what))
+//        "'" + name + "' does not appear to represent a valid phase."
+//      else {
+//        intp.setExecutionWrapper(pathToPhaseWrapper)
+//        val activeMessage =
+//          if (what.toString.length == name.length) "" + what
+//          else "%s (%s)".format(what, name)
+//
+//        "Active phase is now: " + activeMessage
+//      }
+//    }
+  }
+
+  /** Available commands */
+  def commands: List[LoopCommand] = standardCommands ++ (
+    // if (isReplPower)
+    //  powerCommands
+    // else
+      Nil
+    )
+
+  val replayQuestionMessage =
+    """|That entry seems to have slain the compiler.  Shall I replay
+      |your session? I can re-run each line except the last one.
+      |[y/n]
+    """.trim.stripMargin
+
+  private val crashRecovery: PartialFunction[Throwable, Boolean] = {
+    case ex: Throwable =>
+      val (err, explain) = (
+        if (intp.isInitializeComplete)
+          (intp.global.throwableAsString(ex), "")
+        else
+          (ex.getMessage, "The compiler did not initialize.\n")
+        )
+      echo(err)
+
+      ex match {
+        case _: NoSuchMethodError | _: NoClassDefFoundError =>
+          echo("\nUnrecoverable error.")
+          throw ex
+        case _  =>
+          def fn(): Boolean =
+            try in.readYesOrNo(explain + replayQuestionMessage, { echo("\nYou must enter y or n.") ; fn() })
+            catch { case _: RuntimeException => false }
+
+          if (fn()) replay()
+          else echo("\nAbandoning crashed session.")
+      }
+      true
+  }
+
+  // return false if repl should exit
+  def processLine(line: String): Boolean = {
+    import scala.concurrent.duration._
+    Await.ready(globalFuture, 60.seconds)
+
+    (line ne null) && (command(line) match {
+      case Result(false, _)      => false
+      case Result(_, Some(line)) => addReplay(line) ; true
+      case _                     => true
+    })
+  }
+
+  private def readOneLine() = {
+    out.flush()
+    in readLine prompt
+  }
+
+  /** The main read-eval-print loop for the repl.  It calls
+    *  command() for each line of input, and stops when
+    *  command() returns false.
+    */
+  @tailrec final def loop() {
+    if ( try processLine(readOneLine()) catch crashRecovery )
+      loop()
+  }
+
+  /** interpret all lines from a specified file */
+  def interpretAllFrom(file: File) {
+    savingReader {
+      savingReplayStack {
+        file applyReader { reader =>
+          in = SimpleReader(reader, out, interactive = false)
+          echo("Loading " + file + "...")
+          loop()
+        }
+      }
+    }
+  }
+
+  /** create a new interpreter and replay the given commands */
+  def replay() {
+    reset()
+    if (replayCommandStack.isEmpty)
+      echo("Nothing to replay.")
+    else for (cmd <- replayCommands) {
+      echo("Replaying: " + cmd)  // flush because maybe cmd will have its own output
+      command(cmd)
+      echo("")
+    }
+  }
+  def resetCommand() {
+    echo("Resetting interpreter state.")
+    if (replayCommandStack.nonEmpty) {
+      echo("Forgetting this session history:\n")
+      replayCommands foreach echo
+      echo("")
+      replayCommandStack = Nil
+    }
+    if (intp.namedDefinedTerms.nonEmpty)
+      echo("Forgetting all expression results and named terms: " + intp.namedDefinedTerms.mkString(", "))
+    if (intp.definedTypes.nonEmpty)
+      echo("Forgetting defined types: " + intp.definedTypes.mkString(", "))
+
+    reset()
+  }
+  def reset() {
+    intp.reset()
+    unleashAndSetPhase()
+  }
+
+  def lineCommand(what: String): Result = editCommand(what, None)
+
+  // :edit id or :edit line
+  def editCommand(what: String): Result = editCommand(what, Properties.envOrNone("EDITOR"))
+
+  def editCommand(what: String, editor: Option[String]): Result = {
+    def diagnose(code: String) = {
+      echo("The edited code is incomplete!\n")
+      val errless = intp compileSources new BatchSourceFile("<pastie>", s"object pastel {\n$code\n}")
+      if (errless) echo("The compiler reports no errors.")
+    }
+    def historicize(text: String) = history match {
+      case jlh: JLineHistory => text.lines foreach jlh.add ; jlh.moveToEnd() ; true
+      case _ => false
+    }
+    def edit(text: String): Result = editor match {
+      case Some(ed) =>
+        val tmp = File.makeTemp()
+        tmp.writeAll(text)
+        try {
+          val pr = new ProcessResult(s"$ed ${tmp.path}")
+          pr.exitCode match {
+            case 0 =>
+              tmp.safeSlurp() match {
+                case Some(edited) if edited.trim.isEmpty => echo("Edited text is empty.")
+                case Some(edited) =>
+                  echo(edited.lines map ("+" + _) mkString "\n")
+                  val res = intp interpret edited
+                  if (res == IR.Incomplete) diagnose(edited)
+                  else {
+                    historicize(edited)
+                    Result(lineToRecord = Some(edited), keepRunning = true)
+                  }
+                case None => echo("Can't read edited text. Did you delete it?")
+              }
+            case x => echo(s"Error exit from $ed ($x), ignoring")
+          }
+        } finally {
+          tmp.delete()
+        }
+      case None =>
+        if (historicize(text)) echo("Placing text in recent history.")
+        else echo(f"No EDITOR defined and you can't change history, echoing your text:%n$text")
+    }
+
+    // if what is a number, use it as a line number or range in history
+    def isNum = what forall (c => c.isDigit || c == '-' || c == '+')
+    // except that "-" means last value
+    def isLast = (what == "-")
+    if (isLast || !isNum) {
+      val name = if (isLast) intp.mostRecentVar else what
+      val sym = intp.symbolOfIdent(name)
+      intp.prevRequestList collectFirst { case r if r.defines contains sym => r } match {
+        case Some(req) => edit(req.line)
+        case None      => echo(s"No symbol in scope: $what")
+      }
+    } else try {
+      val s = what
+      // line 123, 120+3, -3, 120-123, 120-, note -3 is not 0-3 but (cur-3,cur)
+      val (start, len) =
+        if ((s indexOf '+') > 0) {
+          val (a,b) = s splitAt (s indexOf '+')
+          (a.toInt, b.drop(1).toInt)
+        } else {
+          (s indexOf '-') match {
+            case -1 => (s.toInt, 1)
+            case 0  => val n = s.drop(1).toInt ; (history.index - n, n)
+            case _ if s.last == '-' => val n = s.init.toInt ; (n, history.index - n)
+            case i  => val n = s.take(i).toInt ; (n, s.drop(i+1).toInt - n)
+          }
+        }
+      import scala.collection.JavaConverters._
+      val index = (start - 1) max 0
+      val text = history match {
+        case jlh: JLineHistory => jlh.entries(index).asScala.take(len) map (_.value) mkString "\n"
+        case _ => history.asStrings.slice(index, index + len) mkString "\n"
+      }
+      edit(text)
+    } catch {
+      case _: NumberFormatException => echo(s"Bad range '$what'")
+        echo("Use line 123, 120+3, -3, 120-123, 120-, note -3 is not 0-3 but (cur-3,cur)")
+    }
+  }
+
+  /** fork a shell and run a command */
+  lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") {
+    override def usage = "<command line>"
+    def apply(line: String): Result = line match {
+      case ""   => showUsage()
+      case _    =>
+        val toRun = s"new ${classOf[ProcessResult].getName}(${string2codeQuoted(line)})"
+        intp interpret toRun
+        ()
+    }
+  }
+
+  def withFile[A](filename: String)(action: File => A): Option[A] = {
+    val res = Some(File(filename)) filter (_.exists) map action
+    if (res.isEmpty) echo("That file does not exist")  // courtesy side-effect
+    res
+  }
+
+  def loadCommand(arg: String) = {
+    var shouldReplay: Option[String] = None
+    withFile(arg)(f => {
+      interpretAllFrom(f)
+      shouldReplay = Some(":load " + arg)
+    })
+    Result(keepRunning = true, shouldReplay)
+  }
+
+  def saveCommand(filename: String): Result = (
+    if (filename.isEmpty) echo("File name is required.")
+    else if (replayCommandStack.isEmpty) echo("No replay commands in session")
+    else File(filename).printlnAll(replayCommands: _*)
+    )
+
+  def addClasspath(arg: String): Unit = {
+    val f = File(arg).normalize
+    if (f.exists) {
+      addedClasspath = ClassPath.join(addedClasspath, f.path)
+      val totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath)
+      echo("Added '%s'.  Your new classpath is:\n\"%s\"".format(f.path, totalClasspath))
+      replay()
+    }
+    else echo("The path '" + f + "' doesn't seem to exist.")
+  }
+
+  def powerCmd(): Result = {
+    if (isReplPower) "Already in power mode."
+    else enablePowerMode(isDuringInit = false)
+  }
+  def enablePowerMode(isDuringInit: Boolean) = {
+    replProps.power setValue true
+    unleashAndSetPhase()
+    // asyncEcho(isDuringInit, power.banner)
+  }
+  private def unleashAndSetPhase() {
+    if (isReplPower) {
+    //  power.unleash()
+      // Set the phase to "typer"
+      // intp beSilentDuring phaseCommand("typer")
+    }
+  }
+
+  def asyncEcho(async: Boolean, msg: => String) {
+    if (async) asyncMessage(msg)
+    else echo(msg)
+  }
+
+  def verbosity() = {
+    val old = intp.printResults
+    intp.printResults = !old
+    echo("Switched " + (if (old) "off" else "on") + " result printing.")
+  }
+
+  /** Run one command submitted by the user.  Two values are returned:
+    * (1) whether to keep running, (2) the line to record for replay,
+    * if any. */
+  def command(line: String): Result = {
+    if (line startsWith ":") {
+      val cmd = line.tail takeWhile (x => !x.isWhitespace)
+      uniqueCommand(cmd) match {
+        case Some(lc) => lc(line.tail stripPrefix cmd dropWhile (_.isWhitespace))
+        case _        => ambiguousError(cmd)
+      }
+    }
+    else if (intp.global == null) Result(keepRunning = false, None)  // Notice failure to create compiler
+    else Result(keepRunning = true, interpretStartingWith(line))
+  }
+
+  private def readWhile(cond: String => Boolean) = {
+    Iterator continually in.readLine("") takeWhile (x => x != null && cond(x))
+  }
+
+  def pasteCommand(arg: String): Result = {
+    var shouldReplay: Option[String] = None
+    def result = Result(keepRunning = true, shouldReplay)
+    val (raw, file) =
+      if (arg.isEmpty) (false, None)
+      else {
+        val r = """(-raw)?(\s+)?([^\-]\S*)?""".r
+        arg match {
+          case r(flag, sep, name) =>
+            if (flag != null && name != null && sep == null)
+              echo(s"""I assume you mean "$flag $name"?""")
+            (flag != null, Option(name))
+          case _ =>
+            echo("usage: :paste -raw file")
+            return result
+        }
+      }
+    val code = file match {
+      case Some(name) =>
+        withFile(name)(f => {
+          shouldReplay = Some(s":paste $arg")
+          val s = f.slurp.trim
+          if (s.isEmpty) echo(s"File contains no code: $f")
+          else echo(s"Pasting file $f...")
+          s
+        }) getOrElse ""
+      case None =>
+        echo("// Entering paste mode (ctrl-D to finish)\n")
+        val text = (readWhile(_ => true) mkString "\n").trim
+        if (text.isEmpty) echo("\n// Nothing pasted, nothing gained.\n")
+        else echo("\n// Exiting paste mode, now interpreting.\n")
+        text
+    }
+    def interpretCode() = {
+      val res = intp interpret code
+      // if input is incomplete, let the compiler try to say why
+      if (res == IR.Incomplete) {
+        echo("The pasted code is incomplete!\n")
+        // Remembrance of Things Pasted in an object
+        val errless = intp compileSources new BatchSourceFile("<pastie>", s"object pastel {\n$code\n}")
+        if (errless) echo("...but compilation found no error? Good luck with that.")
+      }
+    }
+    def compileCode() = {
+      val errless = intp compileSources new BatchSourceFile("<pastie>", code)
+      if (!errless) echo("There were compilation errors!")
+    }
+    if (code.nonEmpty) {
+      if (raw) compileCode() else interpretCode()
+    }
+    result
+  }
+
+  private object paste extends Pasted {
+    val ContinueString = "     | "
+    val PromptString   = "scala> "
+
+    def interpret(line: String): Unit = {
+      echo(line.trim)
+      intp interpret line
+      echo("")
+    }
+
+    def transcript(start: String) = {
+      echo("\n// Detected repl transcript paste: ctrl-D to finish.\n")
+      apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim))
+    }
+  }
+  import paste.{ ContinueString, PromptString }
+
+  /** Interpret expressions starting with the first line.
+    * Read lines until a complete compilation unit is available
+    * or until a syntax error has been seen.  If a full unit is
+    * read, go ahead and interpret it.  Return the full string
+    * to be recorded for replay, if any.
+    */
+  def interpretStartingWith(code: String): Option[String] = {
+    // signal completion non-completion input has been received
+    in.completion.resetVerbosity()
+
+    def reallyInterpret = {
+      val reallyResult = intp.interpret(code)
+      (reallyResult, reallyResult match {
+        case IR.Error       => None
+        case IR.Success     => Some(code)
+        case IR.Incomplete  =>
+          if (in.interactive && code.endsWith("\n\n")) {
+            echo("You typed two blank lines.  Starting a new command.")
+            None
+          }
+          else in.readLine(ContinueString) match {
+            case null =>
+              // we know compilation is going to fail since we're at EOF and the
+              // parser thinks the input is still incomplete, but since this is
+              // a file being read non-interactively we want to fail.  So we send
+              // it straight to the compiler for the nice error message.
+              intp.compileString(code)
+              None
+
+            case line => interpretStartingWith(code + "\n" + line)
+          }
+      })
+    }
+
+    /** Here we place ourselves between the user and the interpreter and examine
+      *  the input they are ostensibly submitting.  We intervene in several cases:
+      *
+      *  1) If the line starts with "scala> " it is assumed to be an interpreter paste.
+      *  2) If the line starts with "." (but not ".." or "./") it is treated as an invocation
+      *     on the previous result.
+      *  3) If the Completion object's execute returns Some(_), we inject that value
+      *     and avoid the interpreter, as it's likely not valid scala code.
+      */
+    if (code == "") None
+    else if (!paste.running && code.trim.startsWith(PromptString)) {
+      paste.transcript(code)
+      None
+    }
+    else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") {
+      interpretStartingWith(intp.mostRecentVar + code)
+    }
+    else if (code.trim startsWith "//") {
+      // line comment, do nothing
+      None
+    }
+    else
+      reallyInterpret._2
+  }
+
+  // runs :load `file` on any files passed via -i
+  def loadFiles(settings: Settings) = settings match {
+    case settings: GenericRunnerSettings =>
+      for (filename <- settings.loadfiles.value) {
+        val cmd = ":load " + filename
+        command(cmd)
+        addReplay(cmd)
+        echo("")
+      }
+    case _ =>
+  }
+
+  /** Tries to create a JLineReader, falling back to SimpleReader:
+    *  unless settings or properties are such that it should start
+    *  with SimpleReader.
+    */
+  def chooseReader(settings: Settings): InteractiveReader = {
+    if (settings.Xnojline || Properties.isEmacsShell)
+      SimpleReader()
+    else try new JLineReader(
+      if (settings.noCompletion) NoCompletion
+      else new SparkJLineCompletion(intp)
+    )
+    catch {
+      case ex @ (_: Exception | _: NoClassDefFoundError) =>
+        echo("Failed to created JLineReader: " + ex + "\nFalling back to SimpleReader.")
+        SimpleReader()
+    }
+  }
+  protected def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] =
+    u.TypeTag[T](
+      m,
+      new TypeCreator {
+        def apply[U <: Universe with Singleton](m: Mirror[U]): U # Type =
+          m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type]
+      })
+
+  private def loopPostInit() {
+    // Bind intp somewhere out of the regular namespace where
+    // we can get at it in generated code.
+    intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfStaticClass[SparkIMain], classTag[SparkIMain]))
+    // Auto-run code via some setting.
+    ( replProps.replAutorunCode.option
+      flatMap (f => io.File(f).safeSlurp())
+      foreach (intp quietRun _)
+      )
+    // classloader and power mode setup
+    intp.setContextClassLoader()
+    if (isReplPower) {
+     // replProps.power setValue true
+     // unleashAndSetPhase()
+     // asyncMessage(power.banner)
+    }
+    // SI-7418 Now, and only now, can we enable TAB completion.
+    in match {
+      case x: JLineReader => x.consoleReader.postInit
+      case _              =>
+    }
+  }
+  def process(settings: Settings): Boolean = savingContextLoader {
+    this.settings = settings
+    createInterpreter()
+
+    // sets in to some kind of reader depending on environmental cues
+    in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
+    globalFuture = future {
+      intp.initializeSynchronous()
+      loopPostInit()
+      !intp.reporter.hasErrors
+    }
+    import scala.concurrent.duration._
+    Await.ready(globalFuture, 10 seconds)
+    printWelcome()
+    initializeSpark()
+    loadFiles(settings)
+
+    try loop()
+    catch AbstractOrMissingHandler()
+    finally closeInterpreter()
+
+    true
+  }
+
+  @deprecated("Use `process` instead", "2.9.0")
+  def main(settings: Settings): Unit = process(settings) //used by sbt
+}
+
+object SparkILoop {
+  implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp
+
+  // Designed primarily for use by test code: take a String with a
+  // bunch of code, and prints out a transcript of what it would look
+  // like if you'd just typed it into the repl.
+  def runForTranscript(code: String, settings: Settings): String = {
+    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
+
+    stringFromStream { ostream =>
+      Console.withOut(ostream) {
+        val output = new JPrintWriter(new OutputStreamWriter(ostream), true) {
+          override def write(str: String) = {
+            // completely skip continuation lines
+            if (str forall (ch => ch.isWhitespace || ch == '|')) ()
+            else super.write(str)
+          }
+        }
+        val input = new BufferedReader(new StringReader(code.trim + "\n")) {
+          override def readLine(): String = {
+            val s = super.readLine()
+            // helping out by printing the line being interpreted.
+            if (s != null)
+              output.println(s)
+            s
+          }
+        }
+        val repl = new SparkILoop(input, output)
+        if (settings.classpath.isDefault)
+          settings.classpath.value = sys.props("java.class.path")
+
+        repl process settings
+      }
+    }
+  }
+
+  /** Creates an interpreter loop with default settings and feeds
+    *  the given code to it as input.
+    */
+  def run(code: String, sets: Settings = new Settings): String = {
+    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
+
+    stringFromStream { ostream =>
+      Console.withOut(ostream) {
+        val input    = new BufferedReader(new StringReader(code))
+        val output   = new JPrintWriter(new OutputStreamWriter(ostream), true)
+        val repl     = new SparkILoop(input, output)
+
+        if (sets.classpath.isDefault)
+          sets.classpath.value = sys.props("java.class.path")
+
+        repl process sets
+      }
+    }
+  }
+  def run(lines: List[String]): String = run(lines map (_ + "\n") mkString)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org