You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/09/24 08:40:18 UTC

[1/2] spark git commit: [SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes

Repository: spark
Updated Branches:
  refs/heads/master 4943ea598 -> 576c43fb4


http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala
new file mode 100644
index 0000000..cc76a70
--- /dev/null
+++ b/repl/src/main/scala/org/apache/spark/repl/Main.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io.File
+import java.net.URI
+import java.util.Locale
+
+import scala.tools.nsc.GenericRunnerSettings
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.util.Utils
+
+object Main extends Logging {
+
+  initializeLogIfNecessary(true)
+  Signaling.cancelOnInterrupt()
+
+  val conf = new SparkConf()
+  val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
+  val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
+
+  var sparkContext: SparkContext = _
+  var sparkSession: SparkSession = _
+  // this is a public var because tests reset it.
+  var interp: SparkILoop = _
+
+  private var hasErrors = false
+
+  private def scalaOptionError(msg: String): Unit = {
+    hasErrors = true
+    // scalastyle:off println
+    Console.err.println(msg)
+    // scalastyle:on println
+  }
+
+  def main(args: Array[String]) {
+    doMain(args, new SparkILoop)
+  }
+
+  // Visible for testing
+  private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
+    interp = _interp
+    val jars = Utils.getLocalUserJarsForShell(conf)
+      // Remove file:///, file:// or file:/ scheme if exists for each jar
+      .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
+      .mkString(File.pathSeparator)
+    val interpArguments = List(
+      "-Yrepl-class-based",
+      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
+      "-classpath", jars
+    ) ++ args.toList
+
+    val settings = new GenericRunnerSettings(scalaOptionError)
+    settings.processArguments(interpArguments, true)
+
+    if (!hasErrors) {
+      interp.process(settings) // Repl starts and goes in loop of R.E.P.L
+      Option(sparkContext).foreach(_.stop)
+    }
+  }
+
+  def createSparkSession(): SparkSession = {
+    val execUri = System.getenv("SPARK_EXECUTOR_URI")
+    conf.setIfMissing("spark.app.name", "Spark shell")
+    // SparkContext will detect this configuration and register it with the RpcEnv's
+    // file server, setting spark.repl.class.uri to the actual URI for executors to
+    // use. This is sort of ugly but since executors are started as part of SparkContext
+    // initialization in certain cases, there's an initialization order issue that prevents
+    // this from being set after SparkContext is instantiated.
+    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
+    if (execUri != null) {
+      conf.set("spark.executor.uri", execUri)
+    }
+    if (System.getenv("SPARK_HOME") != null) {
+      conf.setSparkHome(System.getenv("SPARK_HOME"))
+    }
+
+    val builder = SparkSession.builder.config(conf)
+    if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
+      if (SparkSession.hiveClassesArePresent) {
+        // In the case that the property is not set at all, builder's config
+        // does not have this value set to 'hive' yet. The original default
+        // behavior is that when there are hive classes, we use hive catalog.
+        sparkSession = builder.enableHiveSupport().getOrCreate()
+        logInfo("Created Spark session with Hive support")
+      } else {
+        // Need to change it back to 'in-memory' if no hive classes are found
+        // in the case that the property is set to hive in spark-defaults.conf
+        builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
+        sparkSession = builder.getOrCreate()
+        logInfo("Created Spark session")
+      }
+    } else {
+      // In the case that the property is set but not to 'hive', the internal
+      // default is 'in-memory'. So the sparkSession will use in-memory catalog.
+      sparkSession = builder.getOrCreate()
+      logInfo("Created Spark session")
+    }
+    sparkContext = sparkSession.sparkContext
+    sparkSession
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
new file mode 100644
index 0000000..c7ae194
--- /dev/null
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io._
+import java.net.URLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.log4j.{Level, LogManager}
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class ReplSuite extends SparkFunSuite {
+
+  def runInterpreter(master: String, input: String): String = {
+    val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
+
+    val in = new BufferedReader(new StringReader(input + "\n"))
+    val out = new StringWriter()
+    val cl = getClass.getClassLoader
+    var paths = new ArrayBuffer[String]
+    if (cl.isInstanceOf[URLClassLoader]) {
+      val urlLoader = cl.asInstanceOf[URLClassLoader]
+      for (url <- urlLoader.getURLs) {
+        if (url.getProtocol == "file") {
+          paths += url.getFile
+        }
+      }
+    }
+    val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator)
+
+    val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
+    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
+    Main.sparkContext = null
+    Main.sparkSession = null // causes recreation of SparkContext for each test.
+    Main.conf.set("spark.master", master)
+    Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))
+
+    if (oldExecutorClasspath != null) {
+      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
+    } else {
+      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
+    }
+    return out.toString
+  }
+
+  // Simulate the paste mode in Scala REPL.
+  def runInterpreterInPasteMode(master: String, input: String): String =
+    runInterpreter(master, ":paste\n" + input + 4.toChar) // 4 is the ascii code of CTRL + D
+
+  def assertContains(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(isContain,
+      "Interpreter output did not contain '" + message + "':\n" + output)
+  }
+
+  def assertDoesNotContain(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(!isContain,
+      "Interpreter output contained '" + message + "':\n" + output)
+  }
+
+  test("propagation of local properties") {
+    // A mock ILoop that doesn't install the SIGINT handler.
+    class ILoop(out: PrintWriter) extends SparkILoop(None, out) {
+      settings = new scala.tools.nsc.Settings
+      settings.usejavacp.value = true
+      org.apache.spark.repl.Main.interp = this
+    }
+
+    val out = new StringWriter()
+    Main.interp = new ILoop(new PrintWriter(out))
+    Main.sparkContext = new SparkContext("local", "repl-test")
+    Main.interp.createInterpreter()
+
+    Main.sparkContext.setLocalProperty("someKey", "someValue")
+
+    // Make sure the value we set in the caller to interpret is propagated in the thread that
+    // interprets the command.
+    Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
+    assert(out.toString.contains("someValue"))
+
+    Main.sparkContext.stop()
+    System.clearProperty("spark.driver.port")
+  }
+
+  test("SPARK-15236: use Hive catalog") {
+    // turn on the INFO log so that it is possible the code will dump INFO
+    // entry for using "HiveMetastore"
+    val rootLogger = LogManager.getRootLogger()
+    val logLevel = rootLogger.getLevel
+    rootLogger.setLevel(Level.INFO)
+    try {
+      Main.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
+      val output = runInterpreter("local",
+        """
+      |spark.sql("drop table if exists t_15236")
+    """.stripMargin)
+      assertDoesNotContain("error:", output)
+      assertDoesNotContain("Exception", output)
+      // only when the config is set to hive and
+      // hive classes are built, we will use hive catalog.
+      // Then log INFO entry will show things using HiveMetastore
+      if (SparkSession.hiveClassesArePresent) {
+        assertContains("HiveMetaStore", output)
+      } else {
+        // If hive classes are not built, in-memory catalog will be used
+        assertDoesNotContain("HiveMetaStore", output)
+      }
+    } finally {
+      rootLogger.setLevel(logLevel)
+    }
+  }
+
+  test("SPARK-15236: use in-memory catalog") {
+    val rootLogger = LogManager.getRootLogger()
+    val logLevel = rootLogger.getLevel
+    rootLogger.setLevel(Level.INFO)
+    try {
+      Main.conf.set(CATALOG_IMPLEMENTATION.key, "in-memory")
+      val output = runInterpreter("local",
+        """
+          |spark.sql("drop table if exists t_16236")
+        """.stripMargin)
+      assertDoesNotContain("error:", output)
+      assertDoesNotContain("Exception", output)
+      assertDoesNotContain("HiveMetaStore", output)
+    } finally {
+      rootLogger.setLevel(logLevel)
+    }
+  }
+
+  test("broadcast vars") {
+    // Test that the value that a broadcast var had when it was created is used,
+    // even if that variable is then modified in the driver program
+    // TODO: This doesn't actually work for arrays when we run in local mode!
+    val output = runInterpreter("local",
+      """
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+        |array(0) = 5
+        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
+  }
+
+  if (System.getenv("MESOS_NATIVE_JAVA_LIBRARY") != null) {
+    test("running on Mesos") {
+      val output = runInterpreter("localquiet",
+        """
+          |var v = 7
+          |def getV() = v
+          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
+          |v = 10
+          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
+          |var array = new Array[Int](5)
+          |val broadcastArray = sc.broadcast(array)
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+          |array(0) = 5
+          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+        """.stripMargin)
+      assertDoesNotContain("error:", output)
+      assertDoesNotContain("Exception", output)
+      assertContains("res0: Int = 70", output)
+      assertContains("res1: Int = 100", output)
+      assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+      assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    }
+  }
+
+  test("line wrapper only initialized once when used as encoder outer scope") {
+    val output = runInterpreter("local",
+      """
+        |val fileName = "repl-test-" + System.currentTimeMillis
+        |val tmpDir = System.getProperty("java.io.tmpdir")
+        |val file = new java.io.File(tmpDir, fileName)
+        |def createFile(): Unit = file.createNewFile()
+        |
+        |createFile();case class TestCaseClass(value: Int)
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
+        |
+        |file.delete()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("define case class and create Dataset together with paste mode") {
+    val output = runInterpreterInPasteMode("local-cluster[1,1,1024]",
+      """
+        |import spark.implicits._
+        |case class TestClass(value: Int)
+        |Seq(TestClass(1)).toDS()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
new file mode 100644
index 0000000..ec3d790
--- /dev/null
+++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io._
+import java.net.URLClassLoader
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.Utils
+
+/**
+ * A special test suite for REPL that all test cases share one REPL instance.
+ */
+class SingletonReplSuite extends SparkFunSuite {
+
+  private val out = new StringWriter()
+  private val in = new PipedOutputStream()
+  private var thread: Thread = _
+
+  private val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
+  private val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val cl = getClass.getClassLoader
+    var paths = new ArrayBuffer[String]
+    if (cl.isInstanceOf[URLClassLoader]) {
+      val urlLoader = cl.asInstanceOf[URLClassLoader]
+      for (url <- urlLoader.getURLs) {
+        if (url.getProtocol == "file") {
+          paths += url.getFile
+        }
+      }
+    }
+    val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator)
+
+    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
+    Main.conf.set("spark.master", "local-cluster[2,1,1024]")
+    val interp = new SparkILoop(
+      new BufferedReader(new InputStreamReader(new PipedInputStream(in))),
+      new PrintWriter(out))
+
+    // Forces to create new SparkContext
+    Main.sparkContext = null
+    Main.sparkSession = null
+
+    // Starts a new thread to run the REPL interpreter, so that we won't block.
+    thread = new Thread(new Runnable {
+      override def run(): Unit = Main.doMain(Array("-classpath", classpath), interp)
+    })
+    thread.setDaemon(true)
+    thread.start()
+
+    waitUntil(() => out.toString.contains("Type :help for more information"))
+  }
+
+  override def afterAll(): Unit = {
+    in.close()
+    thread.join()
+    if (oldExecutorClasspath != null) {
+      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
+    } else {
+      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
+    }
+    super.afterAll()
+  }
+
+  private def waitUntil(cond: () => Boolean): Unit = {
+    import scala.concurrent.duration._
+    import org.scalatest.concurrent.Eventually._
+
+    eventually(timeout(50.seconds), interval(500.millis)) {
+      assert(cond(), "current output: " + out.toString)
+    }
+  }
+
+  /**
+   * Run the given commands string in a globally shared interpreter instance. Note that the given
+   * commands should not crash the interpreter, to not affect other test cases.
+   */
+  def runInterpreter(input: String): String = {
+    val currentOffset = out.getBuffer.length()
+    // append a special statement to the end of the given code, so that we can know what's
+    // the final output of this code snippet and rely on it to wait until the output is ready.
+    val timestamp = System.currentTimeMillis()
+    in.write((input + s"\nval _result_$timestamp = 1\n").getBytes)
+    in.flush()
+    val stopMessage = s"_result_$timestamp: Int = 1"
+    waitUntil(() => out.getBuffer.substring(currentOffset).contains(stopMessage))
+    out.getBuffer.substring(currentOffset)
+  }
+
+  def assertContains(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(isContain,
+      "Interpreter output did not contain '" + message + "':\n" + output)
+  }
+
+  def assertDoesNotContain(message: String, output: String) {
+    val isContain = output.contains(message)
+    assert(!isContain,
+      "Interpreter output contained '" + message + "':\n" + output)
+  }
+
+  test("simple foreach with accumulator") {
+    val output = runInterpreter(
+      """
+        |val accum = sc.longAccumulator
+        |sc.parallelize(1 to 10).foreach(x => accum.add(x))
+        |val res = accum.value
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res: Long = 55", output)
+  }
+
+  test("external vars") {
+    val output = runInterpreter(
+      """
+        |var v = 7
+        |val res1 = sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
+        |v = 10
+        |val res2 = sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Int = 70", output)
+    assertContains("res2: Int = 100", output)
+  }
+
+  test("external classes") {
+    val output = runInterpreter(
+      """
+        |class C {
+        |def foo = 5
+        |}
+        |val res = sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res: Int = 50", output)
+  }
+
+  test("external functions") {
+    val output = runInterpreter(
+      """
+        |def double(x: Int) = x + x
+        |val res = sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res: Int = 110", output)
+  }
+
+  test("external functions that access vars") {
+    val output = runInterpreter(
+      """
+        |var v = 7
+        |def getV() = v
+        |val res1 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
+        |v = 10
+        |val res2 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Int = 70", output)
+    assertContains("res2: Int = 100", output)
+  }
+
+  test("broadcast vars") {
+    // Test that the value that a broadcast var had when it was created is used,
+    // even if that variable is then modified in the driver program
+    val output = runInterpreter(
+      """
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |val res1 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+        |array(0) = 5
+        |val res2 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+  }
+
+  test("interacting with files") {
+    val tempDir = Utils.createTempDir()
+    val out = new FileWriter(tempDir + "/input")
+    out.write("Hello world!\n")
+    out.write("What's up?\n")
+    out.write("Goodbye\n")
+    out.close()
+    val output = runInterpreter(
+      """
+        |var file = sc.textFile("%s").cache()
+        |val res1 = file.count()
+        |val res2 = file.count()
+        |val res3 = file.count()
+      """.stripMargin.format(StringEscapeUtils.escapeJava(
+        tempDir.getAbsolutePath + File.separator + "input")))
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Long = 3", output)
+    assertContains("res2: Long = 3", output)
+    assertContains("res3: Long = 3", output)
+    Utils.deleteRecursively(tempDir)
+  }
+
+  test("local-cluster mode") {
+    val output = runInterpreter(
+      """
+        |var v = 7
+        |def getV() = v
+        |val res1 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
+        |v = 10
+        |val res2 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
+        |var array = new Array[Int](5)
+        |val broadcastArray = sc.broadcast(array)
+        |val res3 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+        |array(0) = 5
+        |val res4 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res1: Int = 70", output)
+    assertContains("res2: Int = 100", output)
+    assertContains("res3: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+    assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
+  }
+
+  test("SPARK-1199 two instances of same class don't type check.") {
+    val output = runInterpreter(
+      """
+        |case class Sum(exp: String, exp2: String)
+        |val a = Sum("A", "B")
+        |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" }
+        |b(a)
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2452 compound statements.") {
+    val output = runInterpreter(
+      """
+        |val x = 4 ; def f() = x
+        |f()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2576 importing implicits") {
+    // We need to use local-cluster to test this case.
+    val output = runInterpreter(
+      """
+        |import spark.implicits._
+        |case class TestCaseClass(value: Int)
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
+        |
+        |// Test Dataset Serialization in the REPL
+        |Seq(TestCaseClass(1)).toDS().collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("Datasets and encoders") {
+    val output = runInterpreter(
+      """
+        |import org.apache.spark.sql.functions._
+        |import org.apache.spark.sql.{Encoder, Encoders}
+        |import org.apache.spark.sql.expressions.Aggregator
+        |import org.apache.spark.sql.TypedColumn
+        |val simpleSum = new Aggregator[Int, Int, Int] {
+        |  def zero: Int = 0                     // The initial value.
+        |  def reduce(b: Int, a: Int) = b + a    // Add an element to the running total
+        |  def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
+        |  def finish(b: Int) = b                // Return the final result.
+        |  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
+        |  def outputEncoder: Encoder[Int] = Encoders.scalaInt
+        |}.toColumn
+        |
+        |val ds = Seq(1, 2, 3, 4).toDS()
+        |ds.select(simpleSum).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-2632 importing a method from non serializable class and not using it.") {
+    val output = runInterpreter(
+      """
+        |class TestClass() { def testMethod = 3 }
+        |val t = new TestClass
+        |import t.testMethod
+        |case class TestCaseClass(value: Int)
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("collecting objects of class defined in repl") {
+    val output = runInterpreter(
+      """
+        |case class Foo(i: Int)
+        |val res = sc.parallelize((1 to 100).map(Foo), 10).collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res: Array[Foo] = Array(Foo(1),", output)
+  }
+
+  test("collecting objects of class defined in repl - shuffling") {
+    val output = runInterpreter(
+      """
+        |case class Foo(i: Int)
+        |val list = List((1, Foo(1)), (1, Foo(2)))
+        |val res = sc.parallelize(list).groupByKey().collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res: Array[(Int, Iterable[Foo])] = Array((1,", output)
+  }
+
+  test("replicating blocks of object with class defined in repl") {
+    val output = runInterpreter(
+      """
+        |val timeout = 60000 // 60 seconds
+        |val start = System.currentTimeMillis
+        |while(sc.getExecutorStorageStatus.size != 3 &&
+        |    (System.currentTimeMillis - start) < timeout) {
+        |  Thread.sleep(10)
+        |}
+        |if (System.currentTimeMillis - start >= timeout) {
+        |  throw new java.util.concurrent.TimeoutException("Executors were not up in 60 seconds")
+        |}
+        |import org.apache.spark.storage.StorageLevel._
+        |case class Foo(i: Int)
+        |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2)
+        |ret.count()
+        |val res = sc.getExecutorStorageStatus.map(s => s.rddBlocksById(ret.id).size).sum
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("res: Int = 20", output)
+  }
+
+  test("should clone and clean line object in ClosureCleaner") {
+    val output = runInterpreter(
+      """
+        |import org.apache.spark.rdd.RDD
+        |
+        |val lines = sc.textFile("pom.xml")
+        |case class Data(s: String)
+        |val dataRDD = lines.map(line => Data(line.take(3)))
+        |dataRDD.cache.count
+        |val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
+        |repartitioned.cache.count
+        |
+        |def getCacheSize(rdd: RDD[_]) = {
+        |  sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
+        |}
+        |val cacheSize1 = getCacheSize(dataRDD)
+        |val cacheSize2 = getCacheSize(repartitioned)
+        |
+        |// The cache size of dataRDD and the repartitioned one should be similar.
+        |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
+        |assert(deviation < 0.2,
+        |  s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
+      """.stripMargin)
+    assertDoesNotContain("AssertionError", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("newProductSeqEncoder with REPL defined class") {
+    val output = runInterpreter(
+      """
+        |case class Click(id: Int)
+        |spark.implicits.newProductSeqEncoder[Click]
+      """.stripMargin)
+
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
index 8761ae4..4894036 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
@@ -179,7 +179,7 @@ case class Percentile(
 
     val sortedCounts = buffer.toSeq.sortBy(_._1)(
       child.dataType.asInstanceOf[NumericType].ordering.asInstanceOf[Ordering[AnyRef]])
-    val accumlatedCounts = sortedCounts.scanLeft(sortedCounts.head._1, 0L) {
+    val accumlatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) {
       case ((key1, count1), (key2, count2)) => (key2, count1 + count2)
     }.tail
     val maxPosition = accumlatedCounts.last._2 - 1

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 3aa4bf6..352fb54 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -177,7 +177,7 @@ object Metadata {
   private def toJsonValue(obj: Any): JValue = {
     obj match {
       case map: Map[_, _] =>
-        val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) }
+        val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) }
         JObject(fields)
       case arr: Array[_] =>
         val values = arr.toList.map(toJsonValue)

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index c35e563..65ca374 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -96,7 +96,7 @@ case class GenerateExec(
           } else {
             outputRows.map(joinedRow.withRight)
           }
-        } ++ LazyIterator(boundGenerator.terminate).map { row =>
+        } ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
           // we leave the left side as the last element of its child output
           // keep it the same as Hive does
           joinedRow.withRight(row)
@@ -109,7 +109,7 @@ case class GenerateExec(
           } else {
             outputRows
           }
-        } ++ LazyIterator(boundGenerator.terminate)
+        } ++ LazyIterator(() => boundGenerator.terminate())
       }
 
       // Convert the rows to unsafe rows.

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index d74aae3..203d449 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -119,6 +119,7 @@ class InMemoryFileIndex(
         case None =>
           pathsToFetch += path
       }
+      Unit // for some reasons scalac 2.12 needs this; return type doesn't matter
     }
     val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
     val discovered = InMemoryFileIndex.bulkListLeafFiles(

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 0e41f3c..7d6d7e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -205,7 +205,7 @@ class UnivocityParser(
       }
       throw BadRecordException(
         () => getCurrentInput,
-        getPartialResult,
+        () => getPartialResult(),
         new RuntimeException("Malformed CSV record"))
     } else {
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 1b6a28c..f8058b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -216,7 +216,7 @@ private[joins] class UnsafeHashedRelation(
   }
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
-    read(in.readInt, in.readLong, in.readFully)
+    read(() => in.readInt(), () => in.readLong(), in.readFully)
   }
 
   private def read(
@@ -277,7 +277,7 @@ private[joins] class UnsafeHashedRelation(
   }
 
   override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
-    read(in.readInt, in.readLong, in.readBytes)
+    read(() => in.readInt(), () => in.readLong(), in.readBytes)
   }
 
   override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
@@ -766,11 +766,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
   }
 
   override def readExternal(in: ObjectInput): Unit = {
-    read(in.readBoolean, in.readLong, in.readFully)
+    read(() => in.readBoolean(), () => in.readLong(), in.readFully)
   }
 
   override def read(kryo: Kryo, in: Input): Unit = {
-    read(in.readBoolean, in.readLong, in.readBytes)
+    read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 13b006f..c132cab 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -1334,6 +1334,10 @@ public class JavaDatasetSuite implements Serializable {
       return "BeanWithEnum(" + enumField  + ", " + regularField + ")";
     }
 
+    public int hashCode() {
+      return Objects.hashCode(enumField, regularField);
+    }
+
     public boolean equals(Object other) {
       if (other instanceof BeanWithEnum) {
         BeanWithEnum beanWithEnum = (BeanWithEnum) other;

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 09502d0..247c30e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -230,11 +230,9 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
 
     val resNaN1 = dfNaN.stat.approxQuantile("input1", Array(q1, q2), epsilon)
     assert(resNaN1.count(_.isNaN) === 0)
-    assert(resNaN1.count(_ == null) === 0)
 
     val resNaN2 = dfNaN.stat.approxQuantile("input2", Array(q1, q2), epsilon)
     assert(resNaN2.count(_.isNaN) === 0)
-    assert(resNaN2.count(_ == null) === 0)
 
     val resNaN3 = dfNaN.stat.approxQuantile("input3", Array(q1, q2), epsilon)
     assert(resNaN3.isEmpty)
@@ -242,7 +240,6 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
     val resNaNAll = dfNaN.stat.approxQuantile(Array("input1", "input2", "input3"),
       Array(q1, q2), epsilon)
     assert(resNaNAll.flatten.count(_.isNaN) === 0)
-    assert(resNaNAll.flatten.count(_ == null) === 0)
 
     assert(resNaN1(0) === resNaNAll(0)(0))
     assert(resNaN1(1) === resNaNAll(0)(1))

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 2986b7f..46eec73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -289,7 +289,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
       }
     }
 
-    AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, testBehaviorFor)
+    AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc(), testBehaviorFor)
   }
 
   /** Stop a random active query either with `stop()` or with an error */

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/sql/hive-thriftserver/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index a5a8e26..3135a8a 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -63,6 +63,16 @@
       <groupId>${hive.group}</groupId>
       <artifactId>hive-beeline</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <!-- Added for selenium: -->
     <dependency>
       <groupId>org.seleniumhq.selenium</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8c7418e..0274038 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -596,7 +596,7 @@ class StreamingContext private[streaming] (
         }
         logDebug("Adding shutdown hook") // force eager creation of logger
         shutdownHookRef = ShutdownHookManager.addShutdownHook(
-          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
         // Registering Streaming Metrics at the start of the StreamingContext
         assert(env.metricsSystem != null)
         env.metricsSystem.registerSource(streamingSource)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes

Posted by sr...@apache.org.
[SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes

## What changes were proposed in this pull request?

Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:

- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #19307 from srowen/Scala212.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/576c43fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/576c43fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/576c43fb

Branch: refs/heads/master
Commit: 576c43fb4226e4efa12189b41c3bc862019862c6
Parents: 4943ea5
Author: Sean Owen <so...@cloudera.com>
Authored: Sun Sep 24 09:40:13 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Sep 24 09:40:13 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  36 ++
 .../deploy/history/FsHistoryProvider.scala      |   8 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  14 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   4 +-
 .../spark/memory/UnifiedMemoryManager.scala     |   2 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |   6 +-
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  14 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  10 +-
 .../spark/serializer/KryoSerializer.scala       |   4 +-
 .../spark/storage/BlockManagerMaster.scala      |  23 +-
 .../storage/BlockManagerSlaveEndpoint.scala     |  10 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |  14 +-
 .../spark/deploy/SparkSubmitUtilsSuite.scala    |  11 +-
 .../deploy/history/ApplicationCacheSuite.scala  |   4 +-
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala |   8 +-
 .../scheduler/SchedulerIntegrationSuite.scala   |   2 +
 .../apache/spark/serializer/KryoBenchmark.scala |  16 +-
 examples/pom.xml                                |   2 +-
 .../flume/FlumePollingStreamSuite.scala         |   4 +-
 external/kafka-0-10-sql/pom.xml                 |  11 +
 .../spark/sql/kafka010/KafkaTestUtils.scala     |   5 +-
 external/kafka-0-10/pom.xml                     |  11 +
 .../scala/org/apache/spark/ml/Pipeline.scala    |   5 +-
 .../spark/ml/classification/LinearSVC.scala     |   2 +-
 .../ml/classification/LogisticRegression.scala  |   2 +-
 .../spark/ml/classification/OneVsRest.scala     |   2 +-
 .../spark/ml/regression/LinearRegression.scala  |   2 +-
 .../spark/ml/tree/impl/RandomForest.scala       |   6 +-
 .../spark/mllib/tree/impurity/Impurity.scala    |   2 +-
 .../DifferentiableLossAggregatorSuite.scala     |   4 +-
 .../spark/ml/tree/impl/RandomForestSuite.scala  |   6 +-
 repl/pom.xml                                    |   2 -
 .../main/scala/org/apache/spark/repl/Main.scala | 122 ------
 .../scala/org/apache/spark/repl/ReplSuite.scala | 220 ----------
 .../apache/spark/repl/SingletonReplSuite.scala  | 408 -------------------
 .../org/apache/spark/repl/SparkILoop.scala      | 134 ++++++
 .../main/scala/org/apache/spark/repl/Main.scala | 122 ++++++
 .../scala/org/apache/spark/repl/ReplSuite.scala | 220 ++++++++++
 .../apache/spark/repl/SingletonReplSuite.scala  | 408 +++++++++++++++++++
 .../expressions/aggregate/Percentile.scala      |   2 +-
 .../org/apache/spark/sql/types/Metadata.scala   |   2 +-
 .../spark/sql/execution/GenerateExec.scala      |   4 +-
 .../datasources/InMemoryFileIndex.scala         |   1 +
 .../datasources/csv/UnivocityParser.scala       |   2 +-
 .../sql/execution/joins/HashedRelation.scala    |   8 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  |   4 +
 .../apache/spark/sql/DataFrameStatSuite.scala   |   3 -
 .../streaming/StreamingQueryManagerSuite.scala  |   2 +-
 sql/hive-thriftserver/pom.xml                   |  10 +
 .../spark/streaming/StreamingContext.scala      |   2 +-
 51 files changed, 1066 insertions(+), 862 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1821bc8..cec61d8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2826,6 +2826,42 @@ object WritableConverter {
   // them automatically. However, we still keep the old functions in SparkContext for backward
   // compatibility and forward to the following functions directly.
 
+  // The following implicit declarations have been added on top of the very similar ones
+  // below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta
+  // expansion of zero-arg methods and thus won't match a no-arg method where it expects
+  // an implicit that is a function of no args.
+
+  implicit val intWritableConverterFn: () => WritableConverter[Int] =
+    () => simpleWritableConverter[Int, IntWritable](_.get)
+
+  implicit val longWritableConverterFn: () => WritableConverter[Long] =
+    () => simpleWritableConverter[Long, LongWritable](_.get)
+
+  implicit val doubleWritableConverterFn: () => WritableConverter[Double] =
+    () => simpleWritableConverter[Double, DoubleWritable](_.get)
+
+  implicit val floatWritableConverterFn: () => WritableConverter[Float] =
+    () => simpleWritableConverter[Float, FloatWritable](_.get)
+
+  implicit val booleanWritableConverterFn: () => WritableConverter[Boolean] =
+    () => simpleWritableConverter[Boolean, BooleanWritable](_.get)
+
+  implicit val bytesWritableConverterFn: () => WritableConverter[Array[Byte]] = {
+    () => simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
+      // getBytes method returns array which is longer then data to be returned
+      Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
+    }
+  }
+
+  implicit val stringWritableConverterFn: () => WritableConverter[String] =
+    () => simpleWritableConverter[String, Text](_.toString)
+
+  implicit def writableWritableConverterFn[T <: Writable : ClassTag]: () => WritableConverter[T] =
+    () => new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
+
+  // These implicits remain included for backwards-compatibility. They fulfill the
+  // same role as those above.
+
   implicit def intWritableConverter(): WritableConverter[Int] =
     simpleWritableConverter[Int, IntWritable](_.get)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 20fe911..910121e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -218,11 +218,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     if (!conf.contains("spark.testing")) {
       // A task that periodically checks for event log updates on disk.
       logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
-      pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
+      pool.scheduleWithFixedDelay(
+        getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
 
       if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
         // A task that periodically cleans event logs on disk.
-        pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
+        pool.scheduleWithFixedDelay(
+          getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
       }
     } else {
       logDebug("Background update thread disabled for testing")
@@ -268,7 +270,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
               appListener.adminAclsGroups.getOrElse("")
             ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
             ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-            Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
+            Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)))
           } else {
             None
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 29a810f..ed5fa4b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -450,10 +450,9 @@ private[deploy] class Worker(
         }
       }(cleanupThreadExecutor)
 
-      cleanupFuture.onFailure {
-        case e: Throwable =>
-          logError("App dir cleanup failed: " + e.getMessage, e)
-      }(cleanupThreadExecutor)
+      cleanupFuture.failed.foreach(e =>
+        logError("App dir cleanup failed: " + e.getMessage, e)
+      )(cleanupThreadExecutor)
 
     case MasterChanged(masterRef, masterWebUiUrl) =>
       logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@@ -622,10 +621,9 @@ private[deploy] class Worker(
           dirList.foreach { dir =>
             Utils.deleteRecursively(new File(dir))
           }
-        }(cleanupThreadExecutor).onFailure {
-          case e: Throwable =>
-            logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
-        }(cleanupThreadExecutor)
+        }(cleanupThreadExecutor).failed.foreach(e =>
+          logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+        )(cleanupThreadExecutor)
       }
       shuffleService.applicationRemoved(id)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed893cd..d27362a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -163,9 +163,9 @@ private[spark] class CoarseGrainedExecutorBackend(
     if (notifyDriver && driver.nonEmpty) {
       driver.get.ask[Boolean](
         RemoveExecutor(executorId, new ExecutorLossReason(reason))
-      ).onFailure { case e =>
+      ).failed.foreach(e =>
         logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
-      }(ThreadUtils.sameThread)
+      )(ThreadUtils.sameThread)
     }
 
     System.exit(code)

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index df19355..78edd2c 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -143,7 +143,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
     }
 
     executionPool.acquireMemory(
-      numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
+      numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
   }
 
   override def acquireStorageMemory(

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 57782c0..943abae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -128,9 +128,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
     }
     // Compute the minimum and the maximum
     val (max: Double, min: Double) = self.mapPartitions { items =>
-      Iterator(items.foldRight(Double.NegativeInfinity,
-        Double.PositiveInfinity)((e: Double, x: (Double, Double)) =>
-        (x._1.max(e), x._2.min(e))))
+      Iterator(
+        items.foldRight((Double.NegativeInfinity, Double.PositiveInfinity)
+        )((e: Double, x: (Double, Double)) => (x._1.max(e), x._2.min(e))))
     }.reduce { (maxmin1, maxmin2) =>
       (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 1777e7a..f951591 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv(
           onFailure,
           (client, response) => onSuccess(deserialize[Any](client, response)))
         postToOutbox(message.receiver, rpcMessage)
-        promise.future.onFailure {
+        promise.future.failed.foreach {
           case _: TimeoutException => rpcMessage.onTimeout()
           case _ =>
         }(ThreadUtils.sameThread)

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 562dd1d..9153751 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.annotation.tailrec
 import scala.collection.Map
-import scala.collection.mutable.{HashMap, HashSet, Stack}
+import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
 import scala.concurrent.duration._
 import scala.language.existentials
 import scala.language.postfixOps
@@ -396,12 +396,12 @@ class DAGScheduler(
 
   /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
   private def getMissingAncestorShuffleDependencies(
-      rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
-    val ancestors = new Stack[ShuffleDependency[_, _, _]]
+      rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
+    val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
     val visited = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
     // caused by recursively visiting
-    val waitingForVisit = new Stack[RDD[_]]
+    val waitingForVisit = new ArrayStack[RDD[_]]
     waitingForVisit.push(rdd)
     while (waitingForVisit.nonEmpty) {
       val toVisit = waitingForVisit.pop()
@@ -434,7 +434,7 @@ class DAGScheduler(
       rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
     val parents = new HashSet[ShuffleDependency[_, _, _]]
     val visited = new HashSet[RDD[_]]
-    val waitingForVisit = new Stack[RDD[_]]
+    val waitingForVisit = new ArrayStack[RDD[_]]
     waitingForVisit.push(rdd)
     while (waitingForVisit.nonEmpty) {
       val toVisit = waitingForVisit.pop()
@@ -456,7 +456,7 @@ class DAGScheduler(
     val visited = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
     // caused by recursively visiting
-    val waitingForVisit = new Stack[RDD[_]]
+    val waitingForVisit = new ArrayStack[RDD[_]]
     def visit(rdd: RDD[_]) {
       if (!visited(rdd)) {
         visited += rdd
@@ -1633,7 +1633,7 @@ class DAGScheduler(
     val visitedRdds = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
     // caused by recursively visiting
-    val waitingForVisit = new Stack[RDD[_]]
+    val waitingForVisit = new ArrayStack[RDD[_]]
     def visit(rdd: RDD[_]) {
       if (!visitedRdds(rdd)) {
         visitedRdds += rdd

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a0ef209..424e43b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -471,15 +471,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
     // Only log the failure since we don't care about the result.
-    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
-      case t => logError(t.getMessage, t)
-    }(ThreadUtils.sameThread)
+    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
+      logError(t.getMessage, t))(ThreadUtils.sameThread)
   }
 
   protected def removeWorker(workerId: String, host: String, message: String): Unit = {
-    driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
-      case t => logError(t.getMessage, t)
-    }(ThreadUtils.sameThread)
+    driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
+      logError(t.getMessage, t))(ThreadUtils.sameThread)
   }
 
   def sufficientResourcesRegistered(): Boolean = true

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 4f03e54..58483c9 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -501,8 +501,8 @@ private class JavaIterableWrapperSerializer
 private object JavaIterableWrapperSerializer extends Logging {
   // The class returned by JavaConverters.asJava
   // (scala.collection.convert.Wrappers$IterableWrapper).
-  val wrapperClass =
-    scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
+  import scala.collection.JavaConverters._
+  val wrapperClass = Seq(1).asJava.getClass
 
   // Get the underlying method so we can use it to get the Scala collection for serialization.
   private val underlyingMethodOpt = {

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index ea5d842..8b1dc0b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -118,10 +118,9 @@ class BlockManagerMaster(
   /** Remove all blocks belonging to the given RDD. */
   def removeRdd(rddId: Int, blocking: Boolean) {
     val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
-    future.onFailure {
-      case e: Exception =>
-        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
-    }(ThreadUtils.sameThread)
+    future.failed.foreach(e =>
+      logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
+    )(ThreadUtils.sameThread)
     if (blocking) {
       timeout.awaitResult(future)
     }
@@ -130,10 +129,9 @@ class BlockManagerMaster(
   /** Remove all blocks belonging to the given shuffle. */
   def removeShuffle(shuffleId: Int, blocking: Boolean) {
     val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
-    future.onFailure {
-      case e: Exception =>
-        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
-    }(ThreadUtils.sameThread)
+    future.failed.foreach(e =>
+      logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
+    )(ThreadUtils.sameThread)
     if (blocking) {
       timeout.awaitResult(future)
     }
@@ -143,11 +141,10 @@ class BlockManagerMaster(
   def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
     val future = driverEndpoint.askSync[Future[Seq[Int]]](
       RemoveBroadcast(broadcastId, removeFromMaster))
-    future.onFailure {
-      case e: Exception =>
-        logWarning(s"Failed to remove broadcast $broadcastId" +
-          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
-    }(ThreadUtils.sameThread)
+    future.failed.foreach(e =>
+      logWarning(s"Failed to remove broadcast $broadcastId" +
+        s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
+    )(ThreadUtils.sameThread)
     if (blocking) {
       timeout.awaitResult(future)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 1aaa424..742cf4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -85,13 +85,13 @@ class BlockManagerSlaveEndpoint(
       logDebug(actionMessage)
       body
     }
-    future.onSuccess { case response =>
-      logDebug("Done " + actionMessage + ", response is " + response)
+    future.foreach { response =>
+      logDebug(s"Done $actionMessage, response is $response")
       context.reply(response)
-      logDebug("Sent response: " + response + " to " + context.senderAddress)
+      logDebug(s"Sent response: $response to ${context.senderAddress}")
     }
-    future.onFailure { case t: Throwable =>
-      logError("Error in " + actionMessage, t)
+    future.failed.foreach { t =>
+      logError(s"Error in $actionMessage", t)
       context.sendFailure(t)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 094953f..6229e80 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -66,11 +66,11 @@ private[spark] object UIWorkloadGenerator {
     def nextFloat(): Float = new Random().nextFloat()
 
     val jobs = Seq[(String, () => Long)](
-      ("Count", baseData.count),
-      ("Cache and Count", baseData.map(x => x).cache().count),
-      ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
-      ("Entirely failed phase", baseData.map(x => throw new Exception).count),
-      ("Partially failed phase", {
+      ("Count", () => baseData.count),
+      ("Cache and Count", () => baseData.map(x => x).cache().count),
+      ("Single Shuffle", () => baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
+      ("Entirely failed phase", () => baseData.map { x => throw new Exception(); 1 }.count),
+      ("Partially failed phase", () => {
         baseData.map{x =>
           val probFailure = (4.0 / NUM_PARTITIONS)
           if (nextFloat() < probFailure) {
@@ -79,7 +79,7 @@ private[spark] object UIWorkloadGenerator {
           1
         }.count
       }),
-      ("Partially failed phase (longer tasks)", {
+      ("Partially failed phase (longer tasks)", () => {
         baseData.map{x =>
           val probFailure = (4.0 / NUM_PARTITIONS)
           if (nextFloat() < probFailure) {
@@ -89,7 +89,7 @@ private[spark] object UIWorkloadGenerator {
           1
         }.count
       }),
-      ("Job with delays", baseData.map(x => Thread.sleep(100)).count)
+      ("Job with delays", () => baseData.map(x => Thread.sleep(100)).count)
     )
 
     val barrier = new Semaphore(-nJobSet * jobs.size + 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 88b77e5..eb8c203 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -83,11 +83,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
     val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver]
     assert(resolver.getResolvers.size() === 4)
     val expected = repos.split(",").map(r => s"$r/")
-    resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
-      if (1 < i && i < 3) {
-        assert(resolver.getName === s"repo-$i")
-        assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
-      }
+    resolver.getResolvers.toArray.map(_.asInstanceOf[AbstractResolver]).zipWithIndex.foreach {
+      case (r, i) =>
+        if (1 < i && i < 3) {
+          assert(r.getName === s"repo-$i")
+          assert(r.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index c175ed3..6e50e84 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -78,7 +78,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
       logDebug(s"getAppUI($appId, $attemptId)")
       getAppUICount += 1
       instances.get(CacheKey(appId, attemptId)).map( e =>
-        LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime)))
+        LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime)))
     }
 
     override def attachSparkUI(
@@ -122,7 +122,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
         completed: Boolean,
         timestamp: Long): Unit = {
       instances += (CacheKey(appId, attemptId) ->
-          new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp))
+          new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index f4be8ea..de0e71a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -130,10 +130,10 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
         info("Should not have reached this code path (onComplete matching Failure)")
         throw new Exception("Task should succeed")
     }
-    f.onSuccess { case a: Any =>
+    f.foreach { a =>
       sem.release()
     }
-    f.onFailure { case t =>
+    f.failed.foreach { t =>
       info("Should not have reached this code path (onFailure)")
       throw new Exception("Task should succeed")
     }
@@ -164,11 +164,11 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
       case scala.util.Failure(e) =>
         sem.release()
     }
-    f.onSuccess { case a: Any =>
+    f.foreach { a =>
       info("Should not have reached this code path (onSuccess)")
       throw new Exception("Task should fail")
     }
-    f.onFailure { case t =>
+    f.failed.foreach { t =>
       sem.release()
     }
     intercept[SparkException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index a8249e1..75ea409 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -625,6 +625,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
           backend.taskFailed(taskDescription, fetchFailed)
         case (1, _, partition) =>
           backend.taskSuccess(taskDescription, 42 + partition)
+        case unmatched =>
+          fail(s"Unexpected shuffle output $unmatched")
       }
     }
     withBackend(runBackend _) {

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
index 64be966..a1cf357 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -78,10 +78,10 @@ class KryoBenchmark extends SparkFunSuite {
         sum
       }
     }
-    basicTypes("Int", Random.nextInt)
-    basicTypes("Long", Random.nextLong)
-    basicTypes("Float", Random.nextFloat)
-    basicTypes("Double", Random.nextDouble)
+    basicTypes("Int", () => Random.nextInt())
+    basicTypes("Long", () => Random.nextLong())
+    basicTypes("Float", () => Random.nextFloat())
+    basicTypes("Double", () => Random.nextDouble())
 
     // Benchmark Array of Primitives
     val arrayCount = 10000
@@ -101,10 +101,10 @@ class KryoBenchmark extends SparkFunSuite {
         sum
       }
     }
-    basicTypeArray("Int", Random.nextInt)
-    basicTypeArray("Long", Random.nextLong)
-    basicTypeArray("Float", Random.nextFloat)
-    basicTypeArray("Double", Random.nextDouble)
+    basicTypeArray("Int", () => Random.nextInt())
+    basicTypeArray("Long", () => Random.nextLong())
+    basicTypeArray("Float", () => Random.nextFloat())
+    basicTypeArray("Double", () => Random.nextDouble())
 
     // Benchmark Maps
     val mapsCount = 1000

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 33eca48..52a6764 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -114,7 +114,7 @@
     <dependency>
       <groupId>com.github.scopt</groupId>
       <artifactId>scopt_${scala.binary.version}</artifactId>
-      <version>3.3.0</version>
+      <version>3.7.0</version>
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 1c93079..4324cc6 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -61,11 +61,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with
   }
 
   test("flume polling test") {
-    testMultipleTimes(testFlumePolling)
+    testMultipleTimes(() => testFlumePolling())
   }
 
   test("flume polling test multiple hosts") {
-    testMultipleTimes(testFlumePollingMultipleHost)
+    testMultipleTimes(() => testFlumePollingMultipleHost())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/kafka-0-10-sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 0f61a10..0c9f0aa 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -102,8 +102,19 @@
     </dependency>
 
   </dependencies>
+
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
   </build>
+
+  <profiles>
+    <profile>
+      <id>scala-2.12</id>
+      <properties>
+        <kafka.version>0.10.1.1</kafka.version>
+      </properties>
+    </profile>
+  </profiles>
+
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 066a68a..2df8352 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -173,7 +173,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
         AdminUtils.createTopic(zkUtils, topic, partitions, 1)
         created = true
       } catch {
-        case e: kafka.common.TopicExistsException if overwrite => deleteTopic(topic)
+        // Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and
+        // org.apache.kafka.common.errors in 0.10.1 (!)
+        case e: Exception if (e.getClass.getSimpleName == "TopicExistsException") && overwrite =>
+          deleteTopic(topic)
       }
     }
     // wait until metadata is propagated

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/external/kafka-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 4d9861a..6eb7ba5 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -87,8 +87,19 @@
     </dependency>
 
   </dependencies>
+
   <build>
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
   </build>
+
+  <profiles>
+    <profile>
+      <id>scala-2.12</id>
+      <properties>
+        <kafka.version>0.10.1.1</kafka.version>
+      </properties>
+    </profile>
+  </profiles>
+
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index b76dc5f..103082b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -250,8 +250,9 @@ object Pipeline extends MLReadable[Pipeline] {
 
       // Save stages
       val stagesDir = new Path(path, "stages").toString
-      stages.zipWithIndex.foreach { case (stage: MLWritable, idx: Int) =>
-        stage.write.save(getStagePath(stage.uid, idx, stages.length, stagesDir))
+      stages.zipWithIndex.foreach { case (stage, idx) =>
+        stage.asInstanceOf[MLWritable].write.save(
+          getStagePath(stage.uid, idx, stages.length, stagesDir))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 1c97d77..ce400f4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -184,7 +184,7 @@ class LinearSVC @Since("2.2.0") (
           (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
       instances.treeAggregate(
-        new MultivariateOnlineSummarizer, new MultiClassSummarizer
+        (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
       )(seqOp, combOp, $(aggregationDepth))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index cbc8f4a..fa19160 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -514,7 +514,7 @@ class LogisticRegression @Since("1.2.0") (
           (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
       instances.treeAggregate(
-        new MultivariateOnlineSummarizer, new MultiClassSummarizer
+        (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
       )(seqOp, combOp, $(aggregationDepth))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index 92a7742..3ab99b3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -235,7 +235,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] {
       val extraJson = ("labelMetadata" -> instance.labelMetadata.json) ~
         ("numClasses" -> instance.models.length)
       OneVsRestParams.saveImpl(path, instance, sc, Some(extraJson))
-      instance.models.zipWithIndex.foreach { case (model: MLWritable, idx) =>
+      instance.models.map(_.asInstanceOf[MLWritable]).zipWithIndex.foreach { case (model, idx) =>
         val modelPath = new Path(path, s"model_$idx").toString
         model.save(modelPath)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index b2a9681..df1aa60 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -265,7 +265,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
           (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
       instances.treeAggregate(
-        new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+        (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)
       )(seqOp, combOp, $(aggregationDepth))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index f7d969f..acfc639 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -169,7 +169,7 @@ private[spark] object RandomForest extends Logging {
       training the same tree in the next iteration.  This focus allows us to send fewer trees to
       workers on each iteration; see topNodesForGroup below.
      */
-    val nodeStack = new mutable.Stack[(Int, LearningNode)]
+    val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
 
     val rng = new Random()
     rng.setSeed(seed)
@@ -367,7 +367,7 @@ private[spark] object RandomForest extends Logging {
       nodesForGroup: Map[Int, Array[LearningNode]],
       treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]],
       splits: Array[Array[Split]],
-      nodeStack: mutable.Stack[(Int, LearningNode)],
+      nodeStack: mutable.ArrayStack[(Int, LearningNode)],
       timer: TimeTracker = new TimeTracker,
       nodeIdCache: Option[NodeIdCache] = None): Unit = {
 
@@ -1076,7 +1076,7 @@ private[spark] object RandomForest extends Logging {
    *          The feature indices are None if not subsampling features.
    */
   private[tree] def selectNodesToSplit(
-      nodeStack: mutable.Stack[(Int, LearningNode)],
+      nodeStack: mutable.ArrayStack[(Int, LearningNode)],
       maxMemoryUsage: Long,
       metadata: DecisionTreeMetadata,
       rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
index 4c77468..f151a6a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
@@ -162,7 +162,7 @@ private[spark] abstract class ImpurityCalculator(val stats: Array[Double]) exten
    * Fails if the array is empty.
    */
   protected def indexOfLargestArrayElement(array: Array[Double]): Int = {
-    val result = array.foldLeft(-1, Double.MinValue, 0) {
+    val result = array.foldLeft((-1, Double.MinValue, 0)) {
       case ((maxIndex, maxValue, currentIndex), currentValue) =>
         if (currentValue > maxValue) {
           (currentIndex, currentValue, currentIndex + 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
index d7cdeae..9fddf09 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
@@ -174,7 +174,7 @@ object DifferentiableLossAggregatorSuite {
       (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
     instances.aggregate(
-      new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+      (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)
     )(seqOp, combOp)
   }
 
@@ -191,7 +191,7 @@ object DifferentiableLossAggregatorSuite {
       (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
     instances.aggregate(
-      new MultivariateOnlineSummarizer, new MultiClassSummarizer
+      (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
     )(seqOp, combOp)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
index df155b4..dbe2ea9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
@@ -324,7 +324,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
     val treeToNodeToIndexInfo = Map((0, Map(
       (topNode.id, new RandomForest.NodeIndexInfo(0, None))
     )))
-    val nodeStack = new mutable.Stack[(Int, LearningNode)]
+    val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
     RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
       nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
 
@@ -366,7 +366,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
     val treeToNodeToIndexInfo = Map((0, Map(
       (topNode.id, new RandomForest.NodeIndexInfo(0, None))
     )))
-    val nodeStack = new mutable.Stack[(Int, LearningNode)]
+    val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
     RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
       nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
 
@@ -478,7 +478,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
         val failString = s"Failed on test with:" +
           s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," +
           s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
-        val nodeStack = new mutable.Stack[(Int, LearningNode)]
+        val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
         val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees)
         Range(0, numTrees).foreach { treeIndex =>
           topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index 51eb9b6..bd2cfc4 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -171,7 +171,6 @@
     </plugins>
   </build>
   
-  <!--
   <profiles>
     <profile>
       <id>scala-2.12</id>
@@ -181,6 +180,5 @@
       </properties>
     </profile>
   </profiles>
-  -->
 
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
deleted file mode 100644
index cc76a70..0000000
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import java.io.File
-import java.net.URI
-import java.util.Locale
-
-import scala.tools.nsc.GenericRunnerSettings
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.util.Utils
-
-object Main extends Logging {
-
-  initializeLogIfNecessary(true)
-  Signaling.cancelOnInterrupt()
-
-  val conf = new SparkConf()
-  val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
-  val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
-
-  var sparkContext: SparkContext = _
-  var sparkSession: SparkSession = _
-  // this is a public var because tests reset it.
-  var interp: SparkILoop = _
-
-  private var hasErrors = false
-
-  private def scalaOptionError(msg: String): Unit = {
-    hasErrors = true
-    // scalastyle:off println
-    Console.err.println(msg)
-    // scalastyle:on println
-  }
-
-  def main(args: Array[String]) {
-    doMain(args, new SparkILoop)
-  }
-
-  // Visible for testing
-  private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
-    interp = _interp
-    val jars = Utils.getLocalUserJarsForShell(conf)
-      // Remove file:///, file:// or file:/ scheme if exists for each jar
-      .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
-      .mkString(File.pathSeparator)
-    val interpArguments = List(
-      "-Yrepl-class-based",
-      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
-      "-classpath", jars
-    ) ++ args.toList
-
-    val settings = new GenericRunnerSettings(scalaOptionError)
-    settings.processArguments(interpArguments, true)
-
-    if (!hasErrors) {
-      interp.process(settings) // Repl starts and goes in loop of R.E.P.L
-      Option(sparkContext).foreach(_.stop)
-    }
-  }
-
-  def createSparkSession(): SparkSession = {
-    val execUri = System.getenv("SPARK_EXECUTOR_URI")
-    conf.setIfMissing("spark.app.name", "Spark shell")
-    // SparkContext will detect this configuration and register it with the RpcEnv's
-    // file server, setting spark.repl.class.uri to the actual URI for executors to
-    // use. This is sort of ugly but since executors are started as part of SparkContext
-    // initialization in certain cases, there's an initialization order issue that prevents
-    // this from being set after SparkContext is instantiated.
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
-    if (execUri != null) {
-      conf.set("spark.executor.uri", execUri)
-    }
-    if (System.getenv("SPARK_HOME") != null) {
-      conf.setSparkHome(System.getenv("SPARK_HOME"))
-    }
-
-    val builder = SparkSession.builder.config(conf)
-    if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
-      if (SparkSession.hiveClassesArePresent) {
-        // In the case that the property is not set at all, builder's config
-        // does not have this value set to 'hive' yet. The original default
-        // behavior is that when there are hive classes, we use hive catalog.
-        sparkSession = builder.enableHiveSupport().getOrCreate()
-        logInfo("Created Spark session with Hive support")
-      } else {
-        // Need to change it back to 'in-memory' if no hive classes are found
-        // in the case that the property is set to hive in spark-defaults.conf
-        builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
-        sparkSession = builder.getOrCreate()
-        logInfo("Created Spark session")
-      }
-    } else {
-      // In the case that the property is set but not to 'hive', the internal
-      // default is 'in-memory'. So the sparkSession will use in-memory catalog.
-      sparkSession = builder.getOrCreate()
-      logInfo("Created Spark session")
-    }
-    sparkContext = sparkSession.sparkContext
-    sparkSession
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
deleted file mode 100644
index c7ae194..0000000
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import java.io._
-import java.net.URLClassLoader
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.log4j.{Level, LogManager}
-
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-
-class ReplSuite extends SparkFunSuite {
-
-  def runInterpreter(master: String, input: String): String = {
-    val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
-
-    val in = new BufferedReader(new StringReader(input + "\n"))
-    val out = new StringWriter()
-    val cl = getClass.getClassLoader
-    var paths = new ArrayBuffer[String]
-    if (cl.isInstanceOf[URLClassLoader]) {
-      val urlLoader = cl.asInstanceOf[URLClassLoader]
-      for (url <- urlLoader.getURLs) {
-        if (url.getProtocol == "file") {
-          paths += url.getFile
-        }
-      }
-    }
-    val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator)
-
-    val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
-    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
-    Main.sparkContext = null
-    Main.sparkSession = null // causes recreation of SparkContext for each test.
-    Main.conf.set("spark.master", master)
-    Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))
-
-    if (oldExecutorClasspath != null) {
-      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
-    } else {
-      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
-    }
-    return out.toString
-  }
-
-  // Simulate the paste mode in Scala REPL.
-  def runInterpreterInPasteMode(master: String, input: String): String =
-    runInterpreter(master, ":paste\n" + input + 4.toChar) // 4 is the ascii code of CTRL + D
-
-  def assertContains(message: String, output: String) {
-    val isContain = output.contains(message)
-    assert(isContain,
-      "Interpreter output did not contain '" + message + "':\n" + output)
-  }
-
-  def assertDoesNotContain(message: String, output: String) {
-    val isContain = output.contains(message)
-    assert(!isContain,
-      "Interpreter output contained '" + message + "':\n" + output)
-  }
-
-  test("propagation of local properties") {
-    // A mock ILoop that doesn't install the SIGINT handler.
-    class ILoop(out: PrintWriter) extends SparkILoop(None, out) {
-      settings = new scala.tools.nsc.Settings
-      settings.usejavacp.value = true
-      org.apache.spark.repl.Main.interp = this
-    }
-
-    val out = new StringWriter()
-    Main.interp = new ILoop(new PrintWriter(out))
-    Main.sparkContext = new SparkContext("local", "repl-test")
-    Main.interp.createInterpreter()
-
-    Main.sparkContext.setLocalProperty("someKey", "someValue")
-
-    // Make sure the value we set in the caller to interpret is propagated in the thread that
-    // interprets the command.
-    Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
-    assert(out.toString.contains("someValue"))
-
-    Main.sparkContext.stop()
-    System.clearProperty("spark.driver.port")
-  }
-
-  test("SPARK-15236: use Hive catalog") {
-    // turn on the INFO log so that it is possible the code will dump INFO
-    // entry for using "HiveMetastore"
-    val rootLogger = LogManager.getRootLogger()
-    val logLevel = rootLogger.getLevel
-    rootLogger.setLevel(Level.INFO)
-    try {
-      Main.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
-      val output = runInterpreter("local",
-        """
-      |spark.sql("drop table if exists t_15236")
-    """.stripMargin)
-      assertDoesNotContain("error:", output)
-      assertDoesNotContain("Exception", output)
-      // only when the config is set to hive and
-      // hive classes are built, we will use hive catalog.
-      // Then log INFO entry will show things using HiveMetastore
-      if (SparkSession.hiveClassesArePresent) {
-        assertContains("HiveMetaStore", output)
-      } else {
-        // If hive classes are not built, in-memory catalog will be used
-        assertDoesNotContain("HiveMetaStore", output)
-      }
-    } finally {
-      rootLogger.setLevel(logLevel)
-    }
-  }
-
-  test("SPARK-15236: use in-memory catalog") {
-    val rootLogger = LogManager.getRootLogger()
-    val logLevel = rootLogger.getLevel
-    rootLogger.setLevel(Level.INFO)
-    try {
-      Main.conf.set(CATALOG_IMPLEMENTATION.key, "in-memory")
-      val output = runInterpreter("local",
-        """
-          |spark.sql("drop table if exists t_16236")
-        """.stripMargin)
-      assertDoesNotContain("error:", output)
-      assertDoesNotContain("Exception", output)
-      assertDoesNotContain("HiveMetaStore", output)
-    } finally {
-      rootLogger.setLevel(logLevel)
-    }
-  }
-
-  test("broadcast vars") {
-    // Test that the value that a broadcast var had when it was created is used,
-    // even if that variable is then modified in the driver program
-    // TODO: This doesn't actually work for arrays when we run in local mode!
-    val output = runInterpreter("local",
-      """
-        |var array = new Array[Int](5)
-        |val broadcastArray = sc.broadcast(array)
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-        |array(0) = 5
-        |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-    assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
-  }
-
-  if (System.getenv("MESOS_NATIVE_JAVA_LIBRARY") != null) {
-    test("running on Mesos") {
-      val output = runInterpreter("localquiet",
-        """
-          |var v = 7
-          |def getV() = v
-          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
-          |v = 10
-          |sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
-          |var array = new Array[Int](5)
-          |val broadcastArray = sc.broadcast(array)
-          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-          |array(0) = 5
-          |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-        """.stripMargin)
-      assertDoesNotContain("error:", output)
-      assertDoesNotContain("Exception", output)
-      assertContains("res0: Int = 70", output)
-      assertContains("res1: Int = 100", output)
-      assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-      assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-    }
-  }
-
-  test("line wrapper only initialized once when used as encoder outer scope") {
-    val output = runInterpreter("local",
-      """
-        |val fileName = "repl-test-" + System.currentTimeMillis
-        |val tmpDir = System.getProperty("java.io.tmpdir")
-        |val file = new java.io.File(tmpDir, fileName)
-        |def createFile(): Unit = file.createNewFile()
-        |
-        |createFile();case class TestCaseClass(value: Int)
-        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
-        |
-        |file.delete()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("define case class and create Dataset together with paste mode") {
-    val output = runInterpreterInPasteMode("local-cluster[1,1,1024]",
-      """
-        |import spark.implicits._
-        |case class TestClass(value: Int)
-        |Seq(TestClass(1)).toDS()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
deleted file mode 100644
index ec3d790..0000000
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import java.io._
-import java.net.URLClassLoader
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.commons.lang3.StringEscapeUtils
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.util.Utils
-
-/**
- * A special test suite for REPL that all test cases share one REPL instance.
- */
-class SingletonReplSuite extends SparkFunSuite {
-
-  private val out = new StringWriter()
-  private val in = new PipedOutputStream()
-  private var thread: Thread = _
-
-  private val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
-  private val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-
-    val cl = getClass.getClassLoader
-    var paths = new ArrayBuffer[String]
-    if (cl.isInstanceOf[URLClassLoader]) {
-      val urlLoader = cl.asInstanceOf[URLClassLoader]
-      for (url <- urlLoader.getURLs) {
-        if (url.getProtocol == "file") {
-          paths += url.getFile
-        }
-      }
-    }
-    val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator)
-
-    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
-    Main.conf.set("spark.master", "local-cluster[2,1,1024]")
-    val interp = new SparkILoop(
-      new BufferedReader(new InputStreamReader(new PipedInputStream(in))),
-      new PrintWriter(out))
-
-    // Forces to create new SparkContext
-    Main.sparkContext = null
-    Main.sparkSession = null
-
-    // Starts a new thread to run the REPL interpreter, so that we won't block.
-    thread = new Thread(new Runnable {
-      override def run(): Unit = Main.doMain(Array("-classpath", classpath), interp)
-    })
-    thread.setDaemon(true)
-    thread.start()
-
-    waitUntil(() => out.toString.contains("Type :help for more information"))
-  }
-
-  override def afterAll(): Unit = {
-    in.close()
-    thread.join()
-    if (oldExecutorClasspath != null) {
-      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
-    } else {
-      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
-    }
-    super.afterAll()
-  }
-
-  private def waitUntil(cond: () => Boolean): Unit = {
-    import scala.concurrent.duration._
-    import org.scalatest.concurrent.Eventually._
-
-    eventually(timeout(50.seconds), interval(500.millis)) {
-      assert(cond(), "current output: " + out.toString)
-    }
-  }
-
-  /**
-   * Run the given commands string in a globally shared interpreter instance. Note that the given
-   * commands should not crash the interpreter, to not affect other test cases.
-   */
-  def runInterpreter(input: String): String = {
-    val currentOffset = out.getBuffer.length()
-    // append a special statement to the end of the given code, so that we can know what's
-    // the final output of this code snippet and rely on it to wait until the output is ready.
-    val timestamp = System.currentTimeMillis()
-    in.write((input + s"\nval _result_$timestamp = 1\n").getBytes)
-    in.flush()
-    val stopMessage = s"_result_$timestamp: Int = 1"
-    waitUntil(() => out.getBuffer.substring(currentOffset).contains(stopMessage))
-    out.getBuffer.substring(currentOffset)
-  }
-
-  def assertContains(message: String, output: String) {
-    val isContain = output.contains(message)
-    assert(isContain,
-      "Interpreter output did not contain '" + message + "':\n" + output)
-  }
-
-  def assertDoesNotContain(message: String, output: String) {
-    val isContain = output.contains(message)
-    assert(!isContain,
-      "Interpreter output contained '" + message + "':\n" + output)
-  }
-
-  test("simple foreach with accumulator") {
-    val output = runInterpreter(
-      """
-        |val accum = sc.longAccumulator
-        |sc.parallelize(1 to 10).foreach(x => accum.add(x))
-        |val res = accum.value
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res: Long = 55", output)
-  }
-
-  test("external vars") {
-    val output = runInterpreter(
-      """
-        |var v = 7
-        |val res1 = sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
-        |v = 10
-        |val res2 = sc.parallelize(1 to 10).map(x => v).collect().reduceLeft(_+_)
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res1: Int = 70", output)
-    assertContains("res2: Int = 100", output)
-  }
-
-  test("external classes") {
-    val output = runInterpreter(
-      """
-        |class C {
-        |def foo = 5
-        |}
-        |val res = sc.parallelize(1 to 10).map(x => (new C).foo).collect().reduceLeft(_+_)
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res: Int = 50", output)
-  }
-
-  test("external functions") {
-    val output = runInterpreter(
-      """
-        |def double(x: Int) = x + x
-        |val res = sc.parallelize(1 to 10).map(x => double(x)).collect().reduceLeft(_+_)
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res: Int = 110", output)
-  }
-
-  test("external functions that access vars") {
-    val output = runInterpreter(
-      """
-        |var v = 7
-        |def getV() = v
-        |val res1 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
-        |v = 10
-        |val res2 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res1: Int = 70", output)
-    assertContains("res2: Int = 100", output)
-  }
-
-  test("broadcast vars") {
-    // Test that the value that a broadcast var had when it was created is used,
-    // even if that variable is then modified in the driver program
-    val output = runInterpreter(
-      """
-        |var array = new Array[Int](5)
-        |val broadcastArray = sc.broadcast(array)
-        |val res1 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-        |array(0) = 5
-        |val res2 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res1: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-    assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-  }
-
-  test("interacting with files") {
-    val tempDir = Utils.createTempDir()
-    val out = new FileWriter(tempDir + "/input")
-    out.write("Hello world!\n")
-    out.write("What's up?\n")
-    out.write("Goodbye\n")
-    out.close()
-    val output = runInterpreter(
-      """
-        |var file = sc.textFile("%s").cache()
-        |val res1 = file.count()
-        |val res2 = file.count()
-        |val res3 = file.count()
-      """.stripMargin.format(StringEscapeUtils.escapeJava(
-        tempDir.getAbsolutePath + File.separator + "input")))
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res1: Long = 3", output)
-    assertContains("res2: Long = 3", output)
-    assertContains("res3: Long = 3", output)
-    Utils.deleteRecursively(tempDir)
-  }
-
-  test("local-cluster mode") {
-    val output = runInterpreter(
-      """
-        |var v = 7
-        |def getV() = v
-        |val res1 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
-        |v = 10
-        |val res2 = sc.parallelize(1 to 10).map(x => getV()).collect().reduceLeft(_+_)
-        |var array = new Array[Int](5)
-        |val broadcastArray = sc.broadcast(array)
-        |val res3 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-        |array(0) = 5
-        |val res4 = sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res1: Int = 70", output)
-    assertContains("res2: Int = 100", output)
-    assertContains("res3: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-    assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
-  }
-
-  test("SPARK-1199 two instances of same class don't type check.") {
-    val output = runInterpreter(
-      """
-        |case class Sum(exp: String, exp2: String)
-        |val a = Sum("A", "B")
-        |def b(a: Sum): String = a match { case Sum(_, _) => "Found Sum" }
-        |b(a)
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("SPARK-2452 compound statements.") {
-    val output = runInterpreter(
-      """
-        |val x = 4 ; def f() = x
-        |f()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("SPARK-2576 importing implicits") {
-    // We need to use local-cluster to test this case.
-    val output = runInterpreter(
-      """
-        |import spark.implicits._
-        |case class TestCaseClass(value: Int)
-        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
-        |
-        |// Test Dataset Serialization in the REPL
-        |Seq(TestCaseClass(1)).toDS().collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("Datasets and encoders") {
-    val output = runInterpreter(
-      """
-        |import org.apache.spark.sql.functions._
-        |import org.apache.spark.sql.{Encoder, Encoders}
-        |import org.apache.spark.sql.expressions.Aggregator
-        |import org.apache.spark.sql.TypedColumn
-        |val simpleSum = new Aggregator[Int, Int, Int] {
-        |  def zero: Int = 0                     // The initial value.
-        |  def reduce(b: Int, a: Int) = b + a    // Add an element to the running total
-        |  def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
-        |  def finish(b: Int) = b                // Return the final result.
-        |  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
-        |  def outputEncoder: Encoder[Int] = Encoders.scalaInt
-        |}.toColumn
-        |
-        |val ds = Seq(1, 2, 3, 4).toDS()
-        |ds.select(simpleSum).collect
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("SPARK-2632 importing a method from non serializable class and not using it.") {
-    val output = runInterpreter(
-      """
-        |class TestClass() { def testMethod = 3 }
-        |val t = new TestClass
-        |import t.testMethod
-        |case class TestCaseClass(value: Int)
-        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("collecting objects of class defined in repl") {
-    val output = runInterpreter(
-      """
-        |case class Foo(i: Int)
-        |val res = sc.parallelize((1 to 100).map(Foo), 10).collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res: Array[Foo] = Array(Foo(1),", output)
-  }
-
-  test("collecting objects of class defined in repl - shuffling") {
-    val output = runInterpreter(
-      """
-        |case class Foo(i: Int)
-        |val list = List((1, Foo(1)), (1, Foo(2)))
-        |val res = sc.parallelize(list).groupByKey().collect()
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res: Array[(Int, Iterable[Foo])] = Array((1,", output)
-  }
-
-  test("replicating blocks of object with class defined in repl") {
-    val output = runInterpreter(
-      """
-        |val timeout = 60000 // 60 seconds
-        |val start = System.currentTimeMillis
-        |while(sc.getExecutorStorageStatus.size != 3 &&
-        |    (System.currentTimeMillis - start) < timeout) {
-        |  Thread.sleep(10)
-        |}
-        |if (System.currentTimeMillis - start >= timeout) {
-        |  throw new java.util.concurrent.TimeoutException("Executors were not up in 60 seconds")
-        |}
-        |import org.apache.spark.storage.StorageLevel._
-        |case class Foo(i: Int)
-        |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2)
-        |ret.count()
-        |val res = sc.getExecutorStorageStatus.map(s => s.rddBlocksById(ret.id).size).sum
-      """.stripMargin)
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-    assertContains("res: Int = 20", output)
-  }
-
-  test("should clone and clean line object in ClosureCleaner") {
-    val output = runInterpreter(
-      """
-        |import org.apache.spark.rdd.RDD
-        |
-        |val lines = sc.textFile("pom.xml")
-        |case class Data(s: String)
-        |val dataRDD = lines.map(line => Data(line.take(3)))
-        |dataRDD.cache.count
-        |val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
-        |repartitioned.cache.count
-        |
-        |def getCacheSize(rdd: RDD[_]) = {
-        |  sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
-        |}
-        |val cacheSize1 = getCacheSize(dataRDD)
-        |val cacheSize2 = getCacheSize(repartitioned)
-        |
-        |// The cache size of dataRDD and the repartitioned one should be similar.
-        |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
-        |assert(deviation < 0.2,
-        |  s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
-      """.stripMargin)
-    assertDoesNotContain("AssertionError", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("newProductSeqEncoder with REPL defined class") {
-    val output = runInterpreter(
-      """
-        |case class Click(id: Int)
-        |spark.implicits.newProductSeqEncoder[Click]
-      """.stripMargin)
-
-    assertDoesNotContain("error:", output)
-    assertDoesNotContain("Exception", output)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/576c43fb/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
new file mode 100644
index 0000000..4135940
--- /dev/null
+++ b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io.BufferedReader
+
+// scalastyle:off println
+import scala.Predef.{println => _, _}
+// scalastyle:on println
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
+import scala.tools.nsc.util.stringFromStream
+import scala.util.Properties.{javaVersion, javaVmName, versionString}
+
+/**
+ *  A Spark-specific interactive shell.
+ */
+class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
+    extends ILoop(in0, out) {
+  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
+  def this() = this(None, new JPrintWriter(Console.out, true))
+
+  def initializeSpark() {
+    intp.beQuietDuring {
+      processLine("""
+        @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
+            org.apache.spark.repl.Main.sparkSession
+          } else {
+            org.apache.spark.repl.Main.createSparkSession()
+          }
+        @transient val sc = {
+          val _sc = spark.sparkContext
+          if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+            val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+            if (proxyUrl != null) {
+              println(
+                s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
+            } else {
+              println(s"Spark Context Web UI is available at Spark Master Public URL")
+            }
+          } else {
+            _sc.uiWebUrl.foreach {
+              webUrl => println(s"Spark context Web UI available at ${webUrl}")
+            }
+          }
+          println("Spark context available as 'sc' " +
+            s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
+          println("Spark session available as 'spark'.")
+          _sc
+        }
+        """)
+      processLine("import org.apache.spark.SparkContext._")
+      processLine("import spark.implicits._")
+      processLine("import spark.sql")
+      processLine("import org.apache.spark.sql.functions._")
+    }
+  }
+
+  /** Print a welcome message */
+  override def printWelcome() {
+    import org.apache.spark.SPARK_VERSION
+    echo("""Welcome to
+      ____              __
+     / __/__  ___ _____/ /__
+    _\ \/ _ \/ _ `/ __/  '_/
+   /___/ .__/\_,_/_/ /_/\_\   version %s
+      /_/
+         """.format(SPARK_VERSION))
+    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
+      versionString, javaVmName, javaVersion)
+    echo(welcomeMsg)
+    echo("Type in expressions to have them evaluated.")
+    echo("Type :help for more information.")
+  }
+
+  /** Available commands */
+  override def commands: List[LoopCommand] = standardCommands
+
+  /**
+   * We override `createInterpreter` because we need to initialize Spark *before* the REPL
+   * sees any files, so that the Spark context is visible in those files. This is a bit of a
+   * hack, but there isn't another hook available to us at this point.
+   */
+  override def createInterpreter(): Unit = {
+    super.createInterpreter()
+    initializeSpark()
+  }
+
+  override def resetCommand(line: String): Unit = {
+    super.resetCommand(line)
+    initializeSpark()
+    echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
+  }
+}
+
+object SparkILoop {
+
+  /**
+   * Creates an interpreter loop with default settings and feeds
+   * the given code to it as input.
+   */
+  def run(code: String, sets: Settings = new Settings): String = {
+    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
+
+    stringFromStream { ostream =>
+      Console.withOut(ostream) {
+        val input = new BufferedReader(new StringReader(code))
+        val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
+        val repl = new SparkILoop(input, output)
+
+        if (sets.classpath.isDefault) {
+          sets.classpath.value = sys.props("java.class.path")
+        }
+        repl process sets
+      }
+    }
+  }
+  def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org