You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/13 05:50:53 UTC

[1/2] spark git commit: [SPARK-6765] Fix test code style for core.

Repository: spark
Updated Branches:
  refs/heads/master 04bcd67cf -> a1fe59dae


http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 30ee63e..6d25edb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -268,7 +268,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
 object EventLoggingListenerSuite {
 
   /** Get a SparkConf with event logging enabled. */
-  def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
+  def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None): SparkConf = {
     val conf = new SparkConf
     conf.set("spark.eventLog.enabled", "true")
     conf.set("spark.eventLog.testing", "true")
@@ -280,5 +280,5 @@ object EventLoggingListenerSuite {
     conf
   }
 
-  def getUniqueApplicationId = "test-" + System.currentTimeMillis
+  def getUniqueApplicationId: String = "test-" + System.currentTimeMillis
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
index 6b75c98..9b92f8d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
@@ -24,7 +24,9 @@ import org.apache.spark.TaskContext
 /**
  * A Task implementation that fails to serialize.
  */
-private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) {
+private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int)
+  extends Task[Array[Byte]](stageId, 0) {
+
   override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte]
   override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 627c9a4..825c616 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -85,7 +85,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
     val stopperReturned = new Semaphore(0)
 
     class BlockingListener extends SparkListener {
-      override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
         listenerStarted.release()
         listenerWait.acquire()
         drained = true
@@ -206,8 +206,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
     sc.addSparkListener(new StatsReportListener)
     // just to make sure some of the tasks take a noticeable amount of time
     val w = { i: Int =>
-      if (i == 0)
+      if (i == 0) {
         Thread.sleep(100)
+      }
       i
     }
 
@@ -247,12 +248,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
       */
 
       taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
-        taskMetrics.resultSize should be > (0l)
+        taskMetrics.resultSize should be > (0L)
         if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
           taskMetrics.inputMetrics should not be ('defined)
           taskMetrics.outputMetrics should not be ('defined)
           taskMetrics.shuffleWriteMetrics should be ('defined)
-          taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
+          taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0L)
         }
         if (stageInfo.rddInfos.exists(_.name == d4.name)) {
           taskMetrics.shuffleReadMetrics should be ('defined)
@@ -260,7 +261,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
           sm.totalBlocksFetched should be (128)
           sm.localBlocksFetched should be (128)
           sm.remoteBlocksFetched should be (0)
-          sm.remoteBytesRead should be (0l)
+          sm.remoteBytesRead should be (0L)
         }
       }
     }
@@ -406,12 +407,12 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
     val startedGettingResultTasks = new mutable.HashSet[Int]()
     val endedTasks = new mutable.HashSet[Int]()
 
-    override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+    override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
       startedTasks += taskStart.taskInfo.index
       notify()
     }
 
-    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
       endedTasks += taskEnd.taskInfo.index
       notify()
     }
@@ -425,7 +426,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
    * A simple listener that throws an exception on job end.
    */
   private class BadListener extends SparkListener {
-    override def onJobEnd(jobEnd: SparkListenerJobEnd) = { throw new Exception }
+    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { throw new Exception }
   }
 
 }
@@ -438,10 +439,10 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
  */
 private class BasicJobCounter extends SparkListener {
   var count = 0
-  override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
   var count = 0
-  override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index add13f5..ffa4381 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import java.util.Properties
-
 import org.scalatest.FunSuite
 
 import org.apache.spark._
@@ -27,7 +25,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
   def start() {}
   def stop() {}
   def reviveOffers() {}
-  def defaultParallelism() = 1
+  def defaultParallelism(): Int = 1
 }
 
 class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
@@ -115,7 +113,8 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
     }
     val numFreeCores = 1
     taskScheduler.setDAGScheduler(dagScheduler)
-    var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
+    val taskSet = new TaskSet(
+      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
     val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
       new WorkerOffer("executor1", "host1", numFreeCores))
     taskScheduler.submitTasks(taskSet)
@@ -123,7 +122,8 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
     assert(0 === taskDescriptions.length)
 
     // Now check that we can still submit tasks
-    // Even if one of the tasks has not-serializable tasks, the other task set should still be processed without error
+    // Even if one of the tasks has not-serializable tasks, the other task set should
+    // still be processed without error
     taskScheduler.submitTasks(taskSet)
     taskScheduler.submitTasks(FakeTask.createTaskSet(1))
     taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 12330d8..716d12c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -67,7 +67,7 @@ object FakeRackUtil {
     hostToRack(host) = rack
   }
 
-  def getRackForHost(host: String) = {
+  def getRackForHost(host: String): Option[String] = {
     hostToRack.get(host)
   }
 }
@@ -327,8 +327,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     // First offer host1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
 
-    // After this, nothing should get chosen, because we have separated tasks with unavailable preference
-    // from the noPrefPendingTasks
+    // After this, nothing should get chosen, because we have separated tasks with unavailable
+    // preference from the noPrefPendingTasks
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
 
     // Now mark host2 as dead
@@ -499,7 +499,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     sched.addExecutor("execC", "host2")
     manager.executorAdded()
     // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
-    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
+    assert(manager.myLocalityLevels.sameElements(
+      Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
     // test if the valid locality is recomputed when the executor is lost
     sched.removeExecutor("execC")
     manager.executorLost("execC", "host2")
@@ -569,7 +570,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
 
-    val taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
+    val taskSet = new TaskSet(
+      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     intercept[TaskNotSerializableException] {
@@ -582,7 +584,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
     sc = new SparkContext("local", "test", conf)
 
-    def genBytes(size: Int) = { (x: Int) =>
+    def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) =>
       val bytes = Array.ofDim[Byte](size)
       scala.util.Random.nextBytes(bytes)
       bytes
@@ -605,7 +607,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("speculative and noPref task should be scheduled after node-local") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+    val sched = new FakeTaskScheduler(
+      sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host2"), TaskLocation("host1")),
@@ -629,9 +632,11 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
   }
 
-  test("node-local tasks should be scheduled right away when there are only node-local and no-preference tasks") {
+  test("node-local tasks should be scheduled right away " +
+    "when there are only node-local and no-preference tasks") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+    val sched = new FakeTaskScheduler(
+      sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
     val taskSet = FakeTask.createTaskSet(4,
       Seq(TaskLocation("host1")),
       Seq(TaskLocation("host2")),
@@ -650,7 +655,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
   }
 
-  test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") {
+  test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished")
+  {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
     val taskSet = FakeTask.createTaskSet(4,

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index f1a4380..a311512 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -63,16 +63,18 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
 
     // uri is null.
     val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
-    assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+    assert(executorInfo.getCommand.getValue ===
+      s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
 
     // uri exists.
     conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
     val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
-    assert(executorInfo1.getCommand.getValue === s"cd test-app-1*;  ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
+    assert(executorInfo1.getCommand.getValue ===
+      s"cd test-app-1*;  ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
   }
 
   test("mesos resource offers result in launching tasks") {
-    def createOffer(id: Int, mem: Int, cpu: Int) = {
+    def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
       val builder = Offer.newBuilder()
       builder.addResourcesBuilder()
         .setName("mem")
@@ -82,8 +84,10 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
         .setName("cpus")
         .setType(Value.Type.SCALAR)
         .setScalar(Scalar.newBuilder().setValue(cpu))
-      builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
-        .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
+      builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+        .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+        .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+        .setHostname(s"host${id.toString}").build()
     }
 
     val driver = mock[SchedulerDriver]

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 6198df8..b070a54 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -106,7 +106,9 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
     check(mutable.HashMap(1 -> "one", 2 -> "two"))
     check(mutable.HashMap("one" -> 1, "two" -> 2))
     check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
-    check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
+    check(List(
+      mutable.HashMap("one" -> 1, "two" -> 2),
+      mutable.HashMap(1->"one",2->"two",3->"three")))
   }
 
   test("ranges") {
@@ -169,7 +171,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
 
   test("kryo with collect") {
     val control = 1 :: 2 :: Nil
-    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
+    val result = sc.parallelize(control, 2)
+      .map(new ClassWithoutNoArgConstructor(_))
+      .collect()
+      .map(_.x)
     assert(control === result.toSeq)
   }
 
@@ -237,7 +242,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
 
     // Set a special, broken ClassLoader and make sure we get an exception on deserialization
     ser.setDefaultClassLoader(new ClassLoader() {
-      override def loadClass(name: String) = throw new UnsupportedOperationException
+      override def loadClass(name: String): Class[_] = throw new UnsupportedOperationException
     })
     intercept[UnsupportedOperationException] {
       ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
@@ -287,14 +292,14 @@ object KryoTest {
 
   class ClassWithNoArgConstructor {
     var x: Int = 0
-    override def equals(other: Any) = other match {
+    override def equals(other: Any): Boolean = other match {
       case c: ClassWithNoArgConstructor => x == c.x
       case _ => false
     }
   }
 
   class ClassWithoutNoArgConstructor(val x: Int) {
-    override def equals(other: Any) = other match {
+    override def equals(other: Any): Boolean = other match {
       case c: ClassWithoutNoArgConstructor => x == c.x
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
index d037e2c..433fd6b 100644
--- a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
@@ -24,14 +24,16 @@ import org.apache.spark.rdd.RDD
 
 /* A trivial (but unserializable) container for trivial functions */
 class UnserializableClass {
-  def op[T](x: T) = x.toString
+  def op[T](x: T): String = x.toString
   
-  def pred[T](x: T) = x.toString.length % 2 == 0
+  def pred[T](x: T): Boolean = x.toString.length % 2 == 0
 }
 
 class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContext {
 
-  def fixture = (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass)
+  def fixture: (RDD[String], UnserializableClass) = {
+    (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass)
+  }
 
   test("throws expected serialization exceptions on actions") {
     val (data, uc) = fixture

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
index 0ade1ba..963264c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
  * A serializer implementation that always return a single element in a deserialization stream.
  */
 class TestSerializer extends Serializer {
-  override def newInstance() = new TestSerializerInstance
+  override def newInstance(): TestSerializerInstance = new TestSerializerInstance
 }
 
 
@@ -36,7 +36,8 @@ class TestSerializerInstance extends SerializerInstance {
 
   override def serializeStream(s: OutputStream): SerializationStream = ???
 
-  override def deserializeStream(s: InputStream) = new TestDeserializationStream
+  override def deserializeStream(s: InputStream): TestDeserializationStream =
+    new TestDeserializationStream
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = ???
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index b834dc0..7d76435 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -85,8 +85,8 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
     // Now comes the test :
     // Write to shuffle 3; and close it, but before registering it, check if the file lengths for
     // previous task (forof shuffle1) is the same as 'segments'. Earlier, we were inferring length
-    // of block based on remaining data in file : which could mess things up when there is concurrent read
-    // and writes happening to the same shuffle group.
+    // of block based on remaining data in file : which could mess things up when there is
+    // concurrent read and writes happening to the same shuffle group.
 
     val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
       new ShuffleWriteMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6dc5bc4..545722b 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -60,7 +60,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
 
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
-  def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
+  def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
 
   private def makeBlockManager(
       maxMem: Long,
@@ -107,8 +107,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
 
   test("StorageLevel object caching") {
     val level1 = StorageLevel(false, false, false, false, 3)
-    val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1
-    val level3 = StorageLevel(false, false, false, false, 2) // this should return a different object
+    // this should return the same object as level1
+    val level2 = StorageLevel(false, false, false, false, 3)
+    // this should return a different object
+    val level3 = StorageLevel(false, false, false, false, 2)
     assert(level2 === level1, "level2 is not same as level1")
     assert(level2.eq(level1), "level2 is not the same object as level1")
     assert(level3 != level1, "level3 is same as level1")
@@ -802,7 +804,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
 
     // Create a non-trivial (not all zeros) byte array
     var counter = 0.toByte
-    def incr = {counter = (counter + 1).toByte; counter;}
+    def incr: Byte = {counter = (counter + 1).toByte; counter;}
     val bytes = Array.fill[Byte](1000)(incr)
     val byteBuffer = ByteBuffer.wrap(bytes)
 
@@ -956,8 +958,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
 
     // getLocations and getBlockStatus should yield the same locations
-    assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
-    assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
+    assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
+      === 3)
+    assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size
+      === 1)
 
     // insert some more blocks
     store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
@@ -965,8 +969,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
 
     // getLocations and getBlockStatus should yield the same locations
-    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
-    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3)
+    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
+      === 1)
+    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size
+      === 3)
 
     val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
     blockIds.foreach { blockId =>
@@ -1090,8 +1096,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     val memoryStore = store.memoryStore
     val smallList = List.fill(40)(new Array[Byte](100))
     val bigList = List.fill(40)(new Array[Byte](1000))
-    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
-    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisThread === 0)
 
     // Unroll with plenty of space. This should succeed and cache both blocks.
@@ -1144,8 +1150,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     val diskStore = store.diskStore
     val smallList = List.fill(40)(new Array[Byte](100))
     val bigList = List.fill(40)(new Array[Byte](1000))
-    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
-    def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+    def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisThread === 0)
 
     store.putIterator("b1", smallIterator, memAndDisk)
@@ -1187,7 +1193,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     val memOnly = StorageLevel.MEMORY_ONLY
     val memoryStore = store.memoryStore
     val smallList = List.fill(40)(new Array[Byte](100))
-    def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisThread === 0)
 
     // All unroll memory used is released because unrollSafely returned an array

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index 82a82e2..b47157f 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -47,7 +47,7 @@ class LocalDirsSuite extends FunSuite with BeforeAndAfter {
     assert(!new File("/NONEXISTENT_DIR").exists())
     // SPARK_LOCAL_DIRS is a valid directory:
     class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String) = {
+      override def getenv(name: String): String = {
         if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
         else super.getenv(name)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 0d15598..1cb5946 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -137,7 +137,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
       new SparkContext(conf)
     }
 
-    def hasKillLink = find(className("kill-link")).isDefined
+    def hasKillLink: Boolean = find(className("kill-link")).isDefined
     def runSlowJob(sc: SparkContext) {
       sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index c0c28cb..21d8267 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
     val execId = "exe-1"
 
-    def makeTaskMetrics(base: Int) = {
+    def makeTaskMetrics(base: Int): TaskMetrics = {
       val taskMetrics = new TaskMetrics()
       val shuffleReadMetrics = new ShuffleReadMetrics()
       val shuffleWriteMetrics = new ShuffleWriteMetrics()
@@ -291,7 +291,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       taskMetrics
     }
 
-    def makeTaskInfo(taskId: Long, finishTime: Int = 0) = {
+    def makeTaskInfo(taskId: Long, finishTime: Int = 0): TaskInfo = {
       val taskInfo = new TaskInfo(taskId, 0, 1, 0L, execId, "host1", TaskLocality.NODE_LOCAL,
         false)
       taskInfo.finishTime = finishTime

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index e1bc137..3744e47 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -107,7 +107,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val myRddInfo0 = rddInfo0
     val myRddInfo1 = rddInfo1
     val myRddInfo2 = rddInfo2
-    val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+    val stageInfo0 = new StageInfo(
+      0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
     bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 3)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 054ef54..c471627 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -83,7 +83,7 @@ object TestObject {
 class TestClass extends Serializable {
   var x = 5
 
-  def getX = x
+  def getX: Int = x
 
   def run(): Int = {
     var nonSer = new NonSerializable
@@ -95,7 +95,7 @@ class TestClass extends Serializable {
 }
 
 class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
-  def getX = x
+  def getX: Int = x
 
   def run(): Int = {
     var nonSer = new NonSerializable
@@ -164,7 +164,7 @@ object TestObjectWithNesting {
 }
 
 class TestClassWithNesting(val y: Int) extends Serializable {
-  def getY = y
+  def getY: Int = y
 
   def run(): Int = {
     var nonSer = new NonSerializable

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 43b6a40..c053175 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -109,7 +109,8 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
 
     // verify whether the earliest file has been deleted
     val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted
-    logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n"))
+    logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" +
+      rolledOverFiles.mkString("\n"))
     assert(rolledOverFiles.size > 2)
     val earliestRolledOverFile = rolledOverFiles.head
     val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles(
@@ -135,7 +136,7 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
       val testOutputStream = new PipedOutputStream()
       val testInputStream = new PipedInputStream(testOutputStream)
       val appender = FileAppender(testInputStream, testFile, conf)
-      //assert(appender.getClass === classTag[ExpectedAppender].getClass)
+      // assert(appender.getClass === classTag[ExpectedAppender].getClass)
       assert(appender.getClass.getSimpleName ===
         classTag[ExpectedAppender].runtimeClass.getSimpleName)
       if (appender.isInstanceOf[RollingFileAppender]) {
@@ -153,9 +154,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
 
     import RollingFileAppender._
 
-    def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy)
-    def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size)
-    def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval)
+    def rollingStrategy(strategy: String): Seq[(String, String)] =
+      Seq(STRATEGY_PROPERTY -> strategy)
+    def rollingSize(size: String): Seq[(String, String)] = Seq(SIZE_PROPERTY -> size)
+    def rollingInterval(interval: String): Seq[(String, String)] =
+      Seq(INTERVAL_PROPERTY -> interval)
 
     val msInDay = 24 * 60 * 60 * 1000L
     val msInHour = 60 * 60 * 1000L

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
index 72e81f3..403dcb0 100644
--- a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -71,7 +71,7 @@ class NextIteratorSuite extends FunSuite with Matchers {
   class StubIterator(ints: Buffer[Int])  extends NextIterator[Int] {
     var closeCalled = 0
 
-    override def getNext() = {
+    override def getNext(): Int = {
       if (ints.size == 0) {
         finished = true
         0

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 7424c2e..67a9f75 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -98,8 +98,10 @@ class SizeEstimatorSuite
 
     // If an array contains the *same* element many times, we should only count it once.
     val d1 = new DummyClass1
-    assertResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
-    assertResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
+    // 10 pointers plus 8-byte object
+    assertResult(72)(SizeEstimator.estimate(Array.fill(10)(d1)))
+    // 100 pointers plus 8-byte object
+    assertResult(432)(SizeEstimator.estimate(Array.fill(100)(d1)))
 
     // Same thing with huge array containing the same element many times. Note that this won't
     // return exactly 4032 because it can't tell that *all* the elements will equal the first

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
index c1c605c..8b72fe6 100644
--- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala
@@ -63,7 +63,7 @@ class TimeStampedHashMapSuite extends FunSuite {
     assert(map1.getTimestamp("k1").get < threshTime1)
     assert(map1.getTimestamp("k2").isDefined)
     assert(map1.getTimestamp("k2").get >= threshTime1)
-    map1.clearOldValues(threshTime1) //should only clear k1
+    map1.clearOldValues(threshTime1) // should only clear k1
     assert(map1.get("k1") === None)
     assert(map1.get("k2").isDefined)
   }
@@ -93,7 +93,7 @@ class TimeStampedHashMapSuite extends FunSuite {
     assert(map1.getTimestamp("k1").get < threshTime1)
     assert(map1.getTimestamp("k2").isDefined)
     assert(map1.getTimestamp("k2").get >= threshTime1)
-    map1.clearOldValues(threshTime1) //should only clear k1
+    map1.clearOldValues(threshTime1) // should only clear k1
     assert(map1.get("k1") === None)
     assert(map1.get("k2").isDefined)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 5d93086..449fb87 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -106,7 +106,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     val second = 1000
     val minute = second * 60
     val hour = minute * 60
-    def str = Utils.msDurationToString(_)
+    def str: (Long) => String = Utils.msDurationToString(_)
 
     val sep = new DecimalFormatSymbols(Locale.getDefault()).getDecimalSeparator()
 
@@ -199,7 +199,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
   test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
     val parent: File = Utils.createTempDir()
-    val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
+    // The parent directory has two child directories
+    val child1: File = Utils.createTempDir(parent.getCanonicalPath)
     val child2: File = Utils.createTempDir(parent.getCanonicalPath)
     val child3: File = Utils.createTempDir(child1.getCanonicalPath)
     // set the last modified time of child1 to 30 secs old

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
index 794a55d..ce29687 100644
--- a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.FunSuite
 @deprecated("suppress compile time deprecation warning", "1.0.0")
 class VectorSuite extends FunSuite {
 
-  def verifyVector(vector: Vector, expectedLength: Int) = {
+  def verifyVector(vector: Vector, expectedLength: Int): Unit = {
     assert(vector.length == expectedLength)
     assert(vector.elements.min > 0.0)
     assert(vector.elements.max < 1.0)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 48f79ea..dff8f3d 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -185,7 +185,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
 
     // reduceByKey
     val rdd = sc.parallelize(1 to 10).map(i => (i%2, 1))
-    val result1 = rdd.reduceByKey(_+_).collect()
+    val result1 = rdd.reduceByKey(_ + _).collect()
     assert(result1.toSet === Set[(Int, Int)]((0, 5), (1, 5)))
 
     // groupByKey

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 72d9679..9ff067f 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -553,10 +553,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
-    def createCombiner(i: String) = ArrayBuffer[String](i)
-    def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
-    def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String]) =
-      buffer1 ++= buffer2
+    def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
+    def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
+    def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String])
+      : ArrayBuffer[String] = buffer1 ++= buffer2
 
     val agg = new Aggregator[String, String, ArrayBuffer[String]](
       createCombiner _, mergeValue _, mergeCombiners _)
@@ -633,14 +633,17 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
-    def createCombiner(i: Int) = ArrayBuffer[Int](i)
-    def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
-    def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
+    def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
+    def mergeValue(buffer: ArrayBuffer[Int], i: Int): ArrayBuffer[Int] = buffer += i
+    def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]): ArrayBuffer[Int] = {
+      buf1 ++= buf2
+    }
 
     val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners)
     val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None)
 
-    sorter.insertAll((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
+    sorter.insertAll(
+      (1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
 
     val it = sorter.iterator
     while (it.hasNext) {
@@ -654,9 +657,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
-    def createCombiner(i: String) = ArrayBuffer[String](i)
-    def mergeValue(buffer: ArrayBuffer[String], i: String) = buffer += i
-    def mergeCombiners(buf1: ArrayBuffer[String], buf2: ArrayBuffer[String]) = buf1 ++= buf2
+    def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
+    def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
+    def mergeCombiners(buf1: ArrayBuffer[String], buf2: ArrayBuffer[String]): ArrayBuffer[String] =
+      buf1 ++= buf2
 
     val agg = new Aggregator[String, String, ArrayBuffer[String]](
       createCombiner, mergeValue, mergeCombiners)
@@ -720,7 +724,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
     // Using wrongOrdering to show integer overflow introduced exception.
     val rand = new Random(100L)
     val wrongOrdering = new Ordering[String] {
-      override def compare(a: String, b: String) = {
+      override def compare(a: String, b: String): Int = {
         val h1 = if (a == null) 0 else a.hashCode()
         val h2 = if (b == null) 0 else b.hashCode()
         h1 - h2
@@ -742,9 +746,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
 
     // Using aggregation and external spill to make sure ExternalSorter using
     // partitionKeyComparator.
-    def createCombiner(i: String) = ArrayBuffer(i)
-    def mergeValue(c: ArrayBuffer[String], i: String) = c += i
-    def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]) = c1 ++= c2
+    def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer(i)
+    def mergeValue(c: ArrayBuffer[String], i: String): ArrayBuffer[String] = c += i
+    def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]): ArrayBuffer[String] =
+      c1 ++= c2
 
     val agg = new Aggregator[String, String, ArrayBuffer[String]](
       createCombiner, mergeValue, mergeCombiners)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
index ef7178b..03f5f2d 100644
--- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
@@ -28,7 +28,7 @@ import scala.language.reflectiveCalls
 
 class XORShiftRandomSuite extends FunSuite with Matchers {
 
-  def fixture = new {
+  def fixture: Object {val seed: Long; val hundMil: Int; val xorRand: XORShiftRandom} = new {
     val seed = 1L
     val xorRand = new XORShiftRandom(seed)
     val hundMil = 1e8.toInt


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-6765] Fix test code style for core.

Posted by rx...@apache.org.
[SPARK-6765] Fix test code style for core.

Author: Reynold Xin <rx...@databricks.com>

Closes #5484 from rxin/test-style-core and squashes the following commits:

e0b0100 [Reynold Xin] [SPARK-6765] Fix test code style for core.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1fe59da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1fe59da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1fe59da

Branch: refs/heads/master
Commit: a1fe59dae50f551d02dd18676308eca054ff6b07
Parents: 04bcd67
Author: Reynold Xin <rx...@databricks.com>
Authored: Sun Apr 12 20:50:49 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Apr 12 20:50:49 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/AccumulatorSuite.scala     |  30 +++---
 .../org/apache/spark/CacheManagerSuite.scala    |   7 +-
 .../org/apache/spark/CheckpointSuite.scala      |  15 +--
 .../org/apache/spark/ContextCleanerSuite.scala  |   4 +-
 .../test/scala/org/apache/spark/FileSuite.scala |  26 +++--
 .../apache/spark/ImplicitOrderingSuite.scala    |   6 +-
 .../org/apache/spark/JobCancellationSuite.scala |   2 +-
 .../org/apache/spark/LocalSparkContext.scala    |   4 +-
 .../org/apache/spark/PartitioningSuite.scala    |  30 +++---
 .../org/apache/spark/SSLOptionsSuite.scala      |  15 ++-
 .../org/apache/spark/SSLSampleConfigs.scala     |   7 +-
 .../scala/org/apache/spark/ShuffleSuite.scala   |   8 +-
 .../org/apache/spark/SparkContextSuite.scala    |   6 +-
 .../org/apache/spark/StatusTrackerSuite.scala   |   5 +-
 .../apache/spark/broadcast/BroadcastSuite.scala |   2 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |   6 +-
 .../spark/deploy/LogUrlsStandaloneSuite.scala   |   2 +-
 .../deploy/history/HistoryServerSuite.scala     |   4 +-
 .../deploy/rest/StandaloneRestSubmitSuite.scala |   4 +-
 .../deploy/rest/SubmitRestProtocolSuite.scala   |   3 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |   3 +-
 .../deploy/worker/WorkerArgumentsTest.scala     |   4 +-
 .../spark/deploy/worker/WorkerSuite.scala       |   6 +-
 .../spark/metrics/InputOutputMetricsSuite.scala |   2 +-
 .../spark/metrics/MetricsConfigSuite.scala      |  15 ++-
 .../org/apache/spark/rdd/JdbcRDDSuite.scala     |   4 +-
 .../spark/rdd/PairRDDFunctionsSuite.scala       |  41 ++++----
 .../rdd/ParallelCollectionSplitSuite.scala      |  20 ++--
 .../spark/rdd/PartitionPruningRDDSuite.scala    |   7 +-
 .../rdd/PartitionwiseSampledRDDSuite.scala      |   2 +-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  22 ++--
 .../org/apache/spark/rdd/RDDSuiteUtils.scala    |   4 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      |  40 +++----
 .../spark/scheduler/DAGSchedulerSuite.scala     | 103 ++++++++++---------
 .../scheduler/EventLoggingListenerSuite.scala   |   4 +-
 .../scheduler/NotSerializableFakeTask.scala     |   4 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  21 ++--
 .../scheduler/TaskSchedulerImplSuite.scala      |  10 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  26 +++--
 .../mesos/MesosSchedulerBackendSuite.scala      |  14 ++-
 .../spark/serializer/KryoSerializerSuite.scala  |  15 ++-
 .../ProactiveClosureSerializationSuite.scala    |   8 +-
 .../spark/serializer/TestSerializer.scala       |   5 +-
 .../shuffle/hash/HashShuffleManagerSuite.scala  |   4 +-
 .../spark/storage/BlockManagerSuite.scala       |  32 +++---
 .../apache/spark/storage/LocalDirsSuite.scala   |   2 +-
 .../org/apache/spark/ui/UISeleniumSuite.scala   |   2 +-
 .../ui/jobs/JobProgressListenerSuite.scala      |   4 +-
 .../spark/ui/storage/StorageTabSuite.scala      |   3 +-
 .../apache/spark/util/ClosureCleanerSuite.scala |   6 +-
 .../apache/spark/util/FileAppenderSuite.scala   |  13 ++-
 .../apache/spark/util/NextIteratorSuite.scala   |   2 +-
 .../apache/spark/util/SizeEstimatorSuite.scala  |   6 +-
 .../spark/util/TimeStampedHashMapSuite.scala    |   4 +-
 .../org/apache/spark/util/UtilsSuite.scala      |   5 +-
 .../org/apache/spark/util/VectorSuite.scala     |   2 +-
 .../collection/ExternalAppendOnlyMapSuite.scala |   2 +-
 .../util/collection/ExternalSorterSuite.scala   |  35 ++++---
 .../spark/util/random/XORShiftRandomSuite.scala |   2 +-
 59 files changed, 386 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index bd0f8bd..7539946 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -27,19 +27,20 @@ import org.scalatest.Matchers
 class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
 
 
-  implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
-    def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
-      t1 ++= t2
-      t1
-    }
-    def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
-      t1 += t2
-      t1
-    }
-    def zero(t: mutable.Set[A]) : mutable.Set[A] = {
-      new mutable.HashSet[A]()
+  implicit def setAccum[A]: AccumulableParam[mutable.Set[A], A] =
+    new AccumulableParam[mutable.Set[A], A] {
+      def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
+        t1 ++= t2
+        t1
+      }
+      def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
+        t1 += t2
+        t1
+      }
+      def zero(t: mutable.Set[A]) : mutable.Set[A] = {
+        new mutable.HashSet[A]()
+      }
     }
-  }
 
   test ("basic accumulation"){
     sc = new SparkContext("local", "test")
@@ -49,11 +50,10 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
     d.foreach{x => acc += x}
     acc.value should be (210)
 
-
-    val longAcc = sc.accumulator(0l)
+    val longAcc = sc.accumulator(0L)
     val maxInt = Integer.MAX_VALUE.toLong
     d.foreach{x => longAcc += maxInt + x}
-    longAcc.value should be (210l + maxInt * 20)
+    longAcc.value should be (210L + maxInt * 20)
   }
 
   test ("value not assignable from tasks") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4b25c20..70529d9 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -45,16 +45,17 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
     rdd = new RDD[Int](sc, Nil) {
       override def getPartitions: Array[Partition] = Array(split)
       override val getDependencies = List[Dependency[_]]()
-      override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator
+      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
+        Array(1, 2, 3, 4).iterator
     }
     rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
       override def getPartitions: Array[Partition] = firstParent[Int].partitions
-      override def compute(split: Partition, context: TaskContext) =
+      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
         firstParent[Int].iterator(split, context)
     }.cache()
     rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
       override def getPartitions: Array[Partition] = firstParent[Int].partitions
-      override def compute(split: Partition, context: TaskContext) =
+      override def compute(split: Partition, context: TaskContext): Iterator[Int] =
         firstParent[Int].iterator(split, context)
     }.cache()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 32abc65..e1fadde 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -75,7 +75,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
     assert(parCollection.dependencies != Nil)
     assert(parCollection.partitions.length === numPartitions)
-    assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
+    assert(parCollection.partitions.toList ===
+      parCollection.checkpointData.get.getPartitions.toList)
     assert(parCollection.collect() === result)
   }
 
@@ -102,13 +103,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("UnionRDD") {
-    def otherRDD = sc.makeRDD(1 to 10, 1)
+    def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
     testRDD(_.union(otherRDD))
     testRDDPartitions(_.union(otherRDD))
   }
 
   test("CartesianRDD") {
-    def otherRDD = sc.makeRDD(1 to 10, 1)
+    def otherRDD: RDD[Int] = sc.makeRDD(1 to 10, 1)
     testRDD(new CartesianRDD(sc, _, otherRDD))
     testRDDPartitions(new CartesianRDD(sc, _, otherRDD))
 
@@ -223,7 +224,8 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val partitionAfterCheckpoint =  serializeDeserialize(
       unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
     assert(
-      partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
+      partitionBeforeCheckpoint.parents.head.getClass !=
+        partitionAfterCheckpoint.parents.head.getClass,
       "PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
     )
   }
@@ -358,7 +360,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
    * Generate an pair RDD (with partitioner) such that both the RDD and its partitions
    * have large size.
    */
-  def generateFatPairRDD() = {
+  def generateFatPairRDD(): RDD[(Int, Int)] = {
     new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
   }
 
@@ -445,7 +447,8 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
 object CheckpointSuite {
   // This is a custom cogroup function that does not use mapValues like
   // the PairRDDFunctions.cogroup()
-  def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
+  def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
+    : RDD[(K, Array[Iterable[V]])] = {
     new CoGroupedRDD[K](
       Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
       part

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index cdfaace..1de169d 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -64,7 +64,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
     }
   }
 
-  //------ Helper functions ------
+  // ------ Helper functions ------
 
   protected def newRDD() = sc.makeRDD(1 to 10)
   protected def newPairRDD() = newRDD().map(_ -> 1)
@@ -370,7 +370,7 @@ class CleanerTester(
   val cleanerListener = new CleanerListener {
     def rddCleaned(rddId: Int): Unit = {
       toBeCleanedRDDIds -= rddId
-      logInfo("RDD "+ rddId + " cleaned")
+      logInfo("RDD " + rddId + " cleaned")
     }
 
     def shuffleCleaned(shuffleId: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 7acd27c..c8f08ee 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -222,7 +222,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
     val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
     nums.saveAsSequenceFile(outputDir)
     val output =
-        sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
+      sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
   }
 
@@ -451,7 +451,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
 
   test ("prevent user from overwriting the empty directory (new Hadoop API)") {
     sc = new SparkContext("local", "test")
-    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    val randomRDD = sc.parallelize(
+      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
     intercept[FileAlreadyExistsException] {
       randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
     }
@@ -459,8 +460,10 @@ class FileSuite extends FunSuite with LocalSparkContext {
 
   test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
     sc = new SparkContext("local", "test")
-    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
-    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    val randomRDD = sc.parallelize(
+      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
+      tempDir.getPath + "/output")
     assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
     intercept[FileAlreadyExistsException] {
       randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
@@ -471,16 +474,20 @@ class FileSuite extends FunSuite with LocalSparkContext {
     val sf = new SparkConf()
     sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
     sc = new SparkContext(sf)
-    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
-    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    val randomRDD = sc.parallelize(
+      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
+      tempDir.getPath + "/output")
     assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
-    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+    randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
+      tempDir.getPath + "/output")
     assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
   }
 
   test ("save Hadoop Dataset through old Hadoop API") {
     sc = new SparkContext("local", "test")
-    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    val randomRDD = sc.parallelize(
+      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
     val job = new JobConf()
     job.setOutputKeyClass(classOf[String])
     job.setOutputValueClass(classOf[String])
@@ -492,7 +499,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
 
   test ("save Hadoop Dataset through new Hadoop API") {
     sc = new SparkContext("local", "test")
-    val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+    val randomRDD = sc.parallelize(
+      Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
     val job = new Job(sc.hadoopConfiguration)
     job.setOutputKeyClass(classOf[String])
     job.setOutputValueClass(classOf[String])

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
index d895230..51348c0 100644
--- a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
@@ -51,7 +51,7 @@ private object ImplicitOrderingSuite {
     override def compare(o: OrderedClass): Int = ???
   }
   
-  def basicMapExpectations(rdd: RDD[Int]) = {
+  def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
     List((rdd.map(x => (x, x)).keyOrdering.isDefined, 
             "rdd.map(x => (x, x)).keyOrdering.isDefined"),
           (rdd.map(x => (1, x)).keyOrdering.isDefined, 
@@ -68,7 +68,7 @@ private object ImplicitOrderingSuite {
             "rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined"))
   }
   
-  def otherRDDMethodExpectations(rdd: RDD[Int]) = {
+  def otherRDDMethodExpectations(rdd: RDD[Int]): List[(Boolean, String)] = {
     List((rdd.groupBy(x => x).keyOrdering.isDefined, 
            "rdd.groupBy(x => x).keyOrdering.isDefined"),
          (rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty, 
@@ -82,4 +82,4 @@ private object ImplicitOrderingSuite {
          (rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined,
            "rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined"))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 21487bc..4d3e097 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -188,7 +188,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
     val rdd = sc.parallelize(1 to 10, 2).map { i =>
       JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
       (i, i)
-    }.reduceByKey(_+_)
+    }.reduceByKey(_ + _)
     val f1 = rdd.collectAsync()
     val f2 = rdd.countAsync()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 53e367a..8bf2e55 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -37,7 +37,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
     super.afterEach()
   }
 
-  def resetSparkContext() = {
+  def resetSparkContext(): Unit = {
     LocalSparkContext.stop(sc)
     sc = null
   }
@@ -54,7 +54,7 @@ object LocalSparkContext {
   }
 
   /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
-  def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
+  def withSpark[T](sc: SparkContext)(f: SparkContext => T): T = {
     try {
       f(sc)
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index b753231..47e3bf6 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -92,7 +92,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
   test("RangePartitioner for keys that are not Comparable (but with Ordering)") {
     // Row does not extend Comparable, but has an implicit Ordering defined.
     implicit object RowOrdering extends Ordering[Row] {
-      override def compare(x: Row, y: Row) = x.value - y.value
+      override def compare(x: Row, y: Row): Int = x.value - y.value
     }
 
     val rdd = sc.parallelize(1 to 4500).map(x => (Row(x), Row(x)))
@@ -212,20 +212,24 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
     val arrPairs: RDD[(Array[Int], Int)] =
       sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
 
-    assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
+    def verify(testFun: => Unit): Unit = {
+      intercept[SparkException](testFun).getMessage.contains("array")
+    }
+
+    verify(arrs.distinct())
     // We can't catch all usages of arrays, since they might occur inside other collections:
     // assert(fails { arrPairs.distinct() })
-    assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
+    verify(arrPairs.partitionBy(new HashPartitioner(2)))
+    verify(arrPairs.join(arrPairs))
+    verify(arrPairs.leftOuterJoin(arrPairs))
+    verify(arrPairs.rightOuterJoin(arrPairs))
+    verify(arrPairs.fullOuterJoin(arrPairs))
+    verify(arrPairs.groupByKey())
+    verify(arrPairs.countByKey())
+    verify(arrPairs.countByKeyApprox(1))
+    verify(arrPairs.cogroup(arrPairs))
+    verify(arrPairs.reduceByKeyLocally(_ + _))
+    verify(arrPairs.reduceByKey(_ + _))
   }
 
   test("zero-length partitions should be correctly handled") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
index 444a333..93f46ef 100644
--- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -36,7 +36,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
     conf.set("spark.ssl.keyPassword", "password")
     conf.set("spark.ssl.trustStore", trustStorePath)
     conf.set("spark.ssl.trustStorePassword", "password")
-    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
     conf.set("spark.ssl.protocol", "SSLv3")
 
     val opts = SSLOptions.parse(conf, "spark.ssl")
@@ -52,7 +53,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
     assert(opts.keyStorePassword === Some("password"))
     assert(opts.keyPassword === Some("password"))
     assert(opts.protocol === Some("SSLv3"))
-    assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+    assert(opts.enabledAlgorithms ===
+      Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
   }
 
   test("test resolving property with defaults specified ") {
@@ -66,7 +68,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
     conf.set("spark.ssl.keyPassword", "password")
     conf.set("spark.ssl.trustStore", trustStorePath)
     conf.set("spark.ssl.trustStorePassword", "password")
-    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
     conf.set("spark.ssl.protocol", "SSLv3")
 
     val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
@@ -83,7 +86,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
     assert(opts.keyStorePassword === Some("password"))
     assert(opts.keyPassword === Some("password"))
     assert(opts.protocol === Some("SSLv3"))
-    assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
+    assert(opts.enabledAlgorithms ===
+      Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
   }
 
   test("test whether defaults can be overridden ") {
@@ -99,7 +103,8 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll {
     conf.set("spark.ssl.keyPassword", "password")
     conf.set("spark.ssl.trustStore", trustStorePath)
     conf.set("spark.ssl.trustStorePassword", "password")
-    conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
+    conf.set("spark.ssl.enabledAlgorithms",
+      "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
     conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
     conf.set("spark.ssl.protocol", "SSLv3")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
index ace8123..308b9ea 100644
--- a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -21,10 +21,11 @@ import java.io.File
 
 object SSLSampleConfigs {
   val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
-  val untrustedKeyStorePath = new File(this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
+  val untrustedKeyStorePath = new File(
+    this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
   val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
 
-  def sparkSSLConfig() = {
+  def sparkSSLConfig(): SparkConf = {
     val conf = new SparkConf(loadDefaults = false)
     conf.set("spark.ssl.enabled", "true")
     conf.set("spark.ssl.keyStore", keyStorePath)
@@ -38,7 +39,7 @@ object SSLSampleConfigs {
     conf
   }
 
-  def sparkSSLConfigUntrusted() = {
+  def sparkSSLConfigUntrusted(): SparkConf = {
     val conf = new SparkConf(loadDefaults = false)
     conf.set("spark.ssl.enabled", "true")
     conf.set("spark.ssl.keyStore", untrustedKeyStorePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 30b6184..d718051 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -142,7 +142,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
   test("shuffle on mutable pairs") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
     val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
     val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
     val results = new ShuffledRDD[Int, Int, Int](pairs,
@@ -155,7 +155,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
     // This is not in SortingSuite because of the local cluster setup.
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
     val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
     val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
     val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
@@ -169,7 +169,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
   test("cogroup using mutable pairs") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
     val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
     val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
     val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
@@ -196,7 +196,7 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
   test("subtract mutable pairs") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+    def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
     val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
     val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
     val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index c7301a3..94be1c6 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -114,11 +114,13 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
 
         if (length1 != gotten1.length()) {
           throw new SparkException(
-            s"file has different length $length1 than added file ${gotten1.length()} : " + absolutePath1)
+            s"file has different length $length1 than added file ${gotten1.length()} : " +
+              absolutePath1)
         }
         if (length2 != gotten2.length()) {
           throw new SparkException(
-            s"file has different length $length2 than added file ${gotten2.length()} : " + absolutePath2)
+            s"file has different length $length2 than added file ${gotten2.length()} : " +
+              absolutePath2)
         }
 
         if (absolutePath1 == gotten1.getAbsolutePath) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index 41d6ea2..084eb23 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -82,7 +82,8 @@ class StatusTrackerSuite extends FunSuite with Matchers with LocalSparkContext {
       secondJobFuture.jobIds.head
     }
     eventually(timeout(10 seconds)) {
-      sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
+      sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (
+        Set(firstJobId, secondJobId))
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index af32726..c8fdfa6 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -33,7 +33,7 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
   val broadcast = rdd.context.broadcast(list)
   val bid = broadcast.id
 
-  def doSomething() = {
+  def doSomething(): Set[(Int, Boolean)] = {
     rdd.map { x =>
       val bm = SparkEnv.get.blockManager
       // Check if broadcast block was fetched

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 68b5776..2071701 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -100,13 +100,13 @@ class JsonProtocolSuite extends FunSuite {
     appInfo
   }
 
-  def createDriverCommand() = new Command(
+  def createDriverCommand(): Command = new Command(
     "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
     Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
   )
 
-  def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
-    false, createDriverCommand())
+  def createDriverDesc(): DriverDescription =
+    new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
 
   def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
     createDriverDesc(), new Date())

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 54dd7c9..9cdb428 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -56,7 +56,7 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
   test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
     val SPARK_PUBLIC_DNS = "public_dns"
     class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String) = {
+      override def getenv(name: String): String = {
         if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
         else super.getenv(name)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 3a9963a..20de46f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -42,10 +42,10 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
     when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())
     val page = new HistoryPage(historyServer)
 
-    //when
+    // when
     val response = page.render(request)
 
-    //then
+    // then
     val links = response \\ "a"
     val justHrefs = for {
       l <- links

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 2fa90e3..8e09976 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -508,7 +508,7 @@ private class DummyMaster(
     exception: Option[Exception] = None)
   extends Actor {
 
-  override def receive = {
+  override def receive: PartialFunction[Any, Unit] = {
     case RequestSubmitDriver(driverDesc) =>
       sender ! SubmitDriverResponse(success = true, Some(submitId), submitMessage)
     case RequestKillDriver(driverId) =>
@@ -531,7 +531,7 @@ private class SmarterMaster extends Actor {
   private var counter: Int = 0
   private val submittedDrivers = new mutable.HashMap[String, DriverState]
 
-  override def receive = {
+  override def receive: PartialFunction[Any, Unit] = {
     case RequestSubmitDriver(driverDesc) =>
       val driverId = s"driver-$counter"
       submittedDrivers(driverId) = RUNNING

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
index 1d64ec2..61071ee 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -129,7 +129,8 @@ class SubmitRestProtocolSuite extends FunSuite {
     assert(newMessage.sparkProperties("spark.files") === "fireball.png")
     assert(newMessage.sparkProperties("spark.driver.memory") === "512m")
     assert(newMessage.sparkProperties("spark.driver.cores") === "180")
-    assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " -Dslices=5 -Dcolor=mostly_red")
+    assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") ===
+      " -Dslices=5 -Dcolor=mostly_red")
     assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar")
     assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar")
     assert(newMessage.sparkProperties("spark.driver.supervise") === "false")

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 6fca632..a8b9df2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -35,7 +35,8 @@ class ExecutorRunnerTest extends FunSuite {
     val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
       "publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
       ExecutorState.RUNNING)
-    val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
+    val builder = CommandUtils.buildProcessBuilder(
+      appDesc.command, 512, sparkHome, er.substituteVariables)
     assert(builder.command().last === appId)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
index 372d7aa..7cc2104 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -37,7 +37,7 @@ class WorkerArgumentsTest extends FunSuite {
     val args = Array("spark://localhost:0000  ")
 
     class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String) = {
+      override def getenv(name: String): String = {
         if (name == "SPARK_WORKER_MEMORY") "50000"
         else super.getenv(name)
       }
@@ -56,7 +56,7 @@ class WorkerArgumentsTest extends FunSuite {
     val args = Array("spark://localhost:0000  ")
 
     class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String) = {
+      override def getenv(name: String): String = {
         if (name == "SPARK_WORKER_MEMORY") "5G"
         else super.getenv(name)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 84e2fd7..450fba2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -24,8 +24,10 @@ import org.scalatest.{Matchers, FunSuite}
 
 class WorkerSuite extends FunSuite with Matchers {
 
-  def cmd(javaOpts: String*) = Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
-  def conf(opts: (String, String)*) = new SparkConf(loadDefaults = false).setAll(opts)
+  def cmd(javaOpts: String*): Command = {
+    Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*))
+  }
+  def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
 
   test("test isUseLocalNodeSSLConfig") {
     Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 78fa98a..190b08d 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -238,7 +238,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
 
     sc.textFile(tmpFilePath, 4)
       .map(key => (key, 1))
-      .reduceByKey(_+_)
+      .reduceByKey(_ + _)
       .saveAsTextFile("file://" + tmpFile.getAbsolutePath)
 
     sc.listenerBus.waitUntilEmpty(500)

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index 37e5284..100ac77 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -35,7 +35,8 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
 
     val property = conf.getInstance("random")
     assert(property.size() === 2)
-    assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(property.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
     assert(property.getProperty("sink.servlet.path") === "/metrics/json")
   }
 
@@ -47,16 +48,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     assert(masterProp.size() === 5)
     assert(masterProp.getProperty("sink.console.period") === "20")
     assert(masterProp.getProperty("sink.console.unit") === "minutes")
-    assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
-    assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(masterProp.getProperty("source.jvm.class") ===
+      "org.apache.spark.metrics.source.JvmSource")
+    assert(masterProp.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
     assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
 
     val workerProp = conf.getInstance("worker")
     assert(workerProp.size() === 5)
     assert(workerProp.getProperty("sink.console.period") === "10")
     assert(workerProp.getProperty("sink.console.unit") === "seconds")
-    assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
-    assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
+    assert(workerProp.getProperty("source.jvm.class") ===
+      "org.apache.spark.metrics.source.JvmSource")
+    assert(workerProp.getProperty("sink.servlet.class") ===
+      "org.apache.spark.metrics.sink.MetricsServlet")
     assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index 0dc5988..be84673 100644
--- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -80,7 +80,7 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
       (r: ResultSet) => { r.getInt(1) } ).cache()
 
     assert(rdd.count === 100)
-    assert(rdd.reduce(_+_) === 10100)
+    assert(rdd.reduce(_ + _) === 10100)
   }
   
   test("large id overflow") {
@@ -92,7 +92,7 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
       1131544775L, 567279358897692673L, 20,
       (r: ResultSet) => { r.getInt(1) } ).cache()
     assert(rdd.count === 100)
-    assert(rdd.reduce(_+_) === 5050)
+    assert(rdd.reduce(_ + _) === 5050)
   }
 
   after {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 108f70a..ca0d953 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -168,13 +168,13 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
 
   test("reduceByKey") {
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_).collect()
+    val sums = pairs.reduceByKey(_ + _).collect()
     assert(sums.toSet === Set((1, 7), (2, 1)))
   }
 
   test("reduceByKey with collectAsMap") {
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_).collectAsMap()
+    val sums = pairs.reduceByKey(_ + _).collectAsMap()
     assert(sums.size === 2)
     assert(sums(1) === 7)
     assert(sums(2) === 1)
@@ -182,7 +182,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
 
   test("reduceByKey with many output partitons") {
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.reduceByKey(_+_, 10).collect()
+    val sums = pairs.reduceByKey(_ + _, 10).collect()
     assert(sums.toSet === Set((1, 7), (2, 1)))
   }
 
@@ -192,7 +192,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
       def getPartition(key: Any) = key.asInstanceOf[Int]
     }
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
-    val sums = pairs.reduceByKey(_+_)
+    val sums = pairs.reduceByKey(_ + _)
     assert(sums.collect().toSet === Set((1, 4), (0, 1)))
     assert(sums.partitioner === Some(p))
     // count the dependencies to make sure there is only 1 ShuffledRDD
@@ -208,7 +208,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   }
 
   test("countApproxDistinctByKey") {
-    def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+    def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble
 
     /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
      * only a statistical bound, the tests can fail for large values of relativeSD. We will be using
@@ -465,7 +465,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
 
   test("foldByKey") {
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
-    val sums = pairs.foldByKey(0)(_+_).collect()
+    val sums = pairs.foldByKey(0)(_ + _).collect()
     assert(sums.toSet === Set((1, 7), (2, 1)))
   }
 
@@ -505,7 +505,8 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     conf.setOutputCommitter(classOf[FakeOutputCommitter])
 
     FakeOutputCommitter.ran = false
-    pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
+    pairs.saveAsHadoopFile(
+      "ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
 
     assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
   }
@@ -552,7 +553,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   }
 
   private object StratifiedAuxiliary {
-    def stratifier (fractionPositive: Double) = {
+    def stratifier (fractionPositive: Double): (Int) => String = {
       (x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
     }
 
@@ -572,7 +573,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     def testSampleExact(stratifiedData: RDD[(String, Int)],
         samplingRate: Double,
         seed: Long,
-        n: Long) = {
+        n: Long): Unit = {
       testBernoulli(stratifiedData, true, samplingRate, seed, n)
       testPoisson(stratifiedData, true, samplingRate, seed, n)
     }
@@ -580,7 +581,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     def testSample(stratifiedData: RDD[(String, Int)],
         samplingRate: Double,
         seed: Long,
-        n: Long) = {
+        n: Long): Unit = {
       testBernoulli(stratifiedData, false, samplingRate, seed, n)
       testPoisson(stratifiedData, false, samplingRate, seed, n)
     }
@@ -590,7 +591,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
         exact: Boolean,
         samplingRate: Double,
         seed: Long,
-        n: Long) = {
+        n: Long): Unit = {
       val expectedSampleSize = stratifiedData.countByKey()
         .mapValues(count => math.ceil(count * samplingRate).toInt)
       val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
@@ -612,7 +613,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
         exact: Boolean,
         samplingRate: Double,
         seed: Long,
-        n: Long) = {
+        n: Long): Unit = {
       val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
         math.ceil(count * samplingRate).toInt)
       val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
@@ -701,27 +702,27 @@ class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
  */
 class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
 
-  def close(p1: NewTaskAttempContext) = ()
+  def close(p1: NewTaskAttempContext): Unit = ()
 
-  def write(p1: Integer, p2: Integer) = ()
+  def write(p1: Integer, p2: Integer): Unit = ()
 
 }
 
 class NewFakeCommitter extends NewOutputCommitter {
-  def setupJob(p1: NewJobContext) = ()
+  def setupJob(p1: NewJobContext): Unit = ()
 
   def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
 
-  def setupTask(p1: NewTaskAttempContext) = ()
+  def setupTask(p1: NewTaskAttempContext): Unit = ()
 
-  def commitTask(p1: NewTaskAttempContext) = ()
+  def commitTask(p1: NewTaskAttempContext): Unit = ()
 
-  def abortTask(p1: NewTaskAttempContext) = ()
+  def abortTask(p1: NewTaskAttempContext): Unit = ()
 }
 
 class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
 
-  def checkOutputSpecs(p1: NewJobContext)  = ()
+  def checkOutputSpecs(p1: NewJobContext): Unit = ()
 
   def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
     new NewFakeWriter()
@@ -735,7 +736,7 @@ class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
 class ConfigTestFormat() extends NewFakeFormat() with Configurable {
 
   var setConfCalled = false
-  def setConf(p1: Configuration) = {
+  def setConf(p1: Configuration): Unit = {
     setConfCalled = true
     ()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index cd193ae..1880364 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -100,7 +100,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     val data = 1 until 100
     val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
-    assert(slices.map(_.size).reduceLeft(_+_) === 99)
+    assert(slices.map(_.size).reduceLeft(_ + _) === 99)
     assert(slices.forall(_.isInstanceOf[Range]))
   }
 
@@ -108,7 +108,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     val data = 1 to 100
     val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
-    assert(slices.map(_.size).reduceLeft(_+_) === 100)
+    assert(slices.map(_.size).reduceLeft(_ + _) === 100)
     assert(slices.forall(_.isInstanceOf[Range]))
   }
 
@@ -139,7 +139,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
       assert(slices(i).isInstanceOf[Range])
       val range = slices(i).asInstanceOf[Range]
       assert(range.start === i * (N / 40), "slice " + i + " start")
-      assert(range.end   === (i+1) * (N / 40), "slice " + i + " end")
+      assert(range.end   === (i + 1) * (N / 40), "slice " + i + " end")
       assert(range.step  === 1, "slice " + i + " step")
     }
   }
@@ -156,7 +156,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
         val slices = ParallelCollectionRDD.slice(d, n)
         ("n slices"    |: slices.size == n) &&
         ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
-        ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
+        ("equal sizes" |: slices.map(_.size).forall(x => x == d.size / n || x == d.size /n + 1))
     }
     check(prop)
   }
@@ -174,7 +174,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
         ("n slices"    |: slices.size == n) &&
         ("all ranges"  |: slices.forall(_.isInstanceOf[Range])) &&
         ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
-        ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
+        ("equal sizes" |: slices.map(_.size).forall(x => x == d.size / n || x == d.size / n + 1))
     }
     check(prop)
   }
@@ -192,7 +192,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
         ("n slices"    |: slices.size == n) &&
         ("all ranges"  |: slices.forall(_.isInstanceOf[Range])) &&
         ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
-        ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
+        ("equal sizes" |: slices.map(_.size).forall(x => x == d.size / n || x == d.size / n + 1))
     }
     check(prop)
   }
@@ -201,7 +201,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     val data = 1L until 100L
     val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
-    assert(slices.map(_.size).reduceLeft(_+_) === 99)
+    assert(slices.map(_.size).reduceLeft(_ + _) === 99)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
   }
 
@@ -209,7 +209,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     val data = 1L to 100L
     val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
-    assert(slices.map(_.size).reduceLeft(_+_) === 100)
+    assert(slices.map(_.size).reduceLeft(_ + _) === 100)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
   }
 
@@ -217,7 +217,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     val data = 1.0 until 100.0 by 1.0
     val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
-    assert(slices.map(_.size).reduceLeft(_+_) === 99)
+    assert(slices.map(_.size).reduceLeft(_ + _) === 99)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
   }
 
@@ -225,7 +225,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
     val data = 1.0 to 100.0 by 1.0
     val slices = ParallelCollectionRDD.slice(data, 3)
     assert(slices.size === 3)
-    assert(slices.map(_.size).reduceLeft(_+_) === 100)
+    assert(slices.map(_.size).reduceLeft(_ + _) === 100)
     assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
index 8408d7e..465068c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
@@ -23,7 +23,6 @@ import org.apache.spark.{Partition, SharedSparkContext, TaskContext}
 
 class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
 
-
   test("Pruned Partitions inherit locality prefs correctly") {
 
     val rdd = new RDD[Int](sc, Nil) {
@@ -74,8 +73,6 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
 }
 
 class TestPartition(i: Int, value: Int) extends Partition with Serializable {
-  def index = i
-
-  def testValue = this.value
-
+  def index: Int = i
+  def testValue: Int = this.value
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
index a048388..0d1369c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala
@@ -35,7 +35,7 @@ class MockSampler extends RandomSampler[Long, Long] {
     Iterator(s)
   }
 
-  override def clone = new MockSampler
+  override def clone: MockSampler = new MockSampler
 }
 
 class PartitionwiseSampledRDDSuite extends FunSuite with SharedSparkContext {

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index bede1ff..df42faa 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -82,7 +82,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
 
   test("countApproxDistinct") {
 
-    def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
+    def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble
 
     val size = 1000
     val uniformDistro = for (i <- 1 to 5000) yield i % size
@@ -100,7 +100,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
   }
 
   test("partitioner aware union") {
-    def makeRDDWithPartitioner(seq: Seq[Int]) = {
+    def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
       sc.makeRDD(seq, 1)
         .map(x => (x, null))
         .partitionBy(new HashPartitioner(2))
@@ -159,8 +159,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
 
   test("treeAggregate") {
     val rdd = sc.makeRDD(-1000 until 1000, 10)
-    def seqOp = (c: Long, x: Int) => c + x
-    def combOp = (c1: Long, c2: Long) => c1 + c2
+    def seqOp: (Long, Int) => Long = (c: Long, x: Int) => c + x
+    def combOp: (Long, Long) => Long = (c1: Long, c2: Long) => c1 + c2
     for (depth <- 1 until 10) {
       val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
       assert(sum === -1000L)
@@ -204,7 +204,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(empty.collect().size === 0)
 
     val thrown = intercept[UnsupportedOperationException]{
-      empty.reduce(_+_)
+      empty.reduce(_ + _)
     }
     assert(thrown.getMessage.contains("empty"))
 
@@ -321,7 +321,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
 
     // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
-    val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
+    val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i + 2)).map{ j => "m" + (j%6)})))
     val coalesced1 = data.coalesce(3)
     assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
 
@@ -921,15 +921,17 @@ class RDDSuite extends FunSuite with SharedSparkContext {
   test("task serialization exception should not hang scheduler") {
     class BadSerializable extends Serializable {
       @throws(classOf[IOException])
-      private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization")
+      private def writeObject(out: ObjectOutputStream): Unit =
+        throw new KryoException("Bad serialization")
 
       @throws(classOf[IOException])
       private def readObject(in: ObjectInputStream): Unit = {}
     }
-    // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were
-    // more threads in the Spark Context than there were number of objects in this sequence.
+    // Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if
+    // there were more threads in the Spark Context than there were number of objects in this
+    // sequence.
     intercept[Throwable] {
-      sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect
+      sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect()
     }
     // Check that the context has not crashed
     sc.parallelize(1 to 100).map(x => x*2).collect

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
index 4762fc1..fe695d8 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala
@@ -21,11 +21,11 @@ object RDDSuiteUtils {
   case class Person(first: String, last: String, age: Int)
 
   object AgeOrdering extends Ordering[Person] {
-    def compare(a:Person, b:Person) = a.age compare b.age
+    def compare(a:Person, b:Person): Int = a.age.compare(b.age)
   }
 
   object NameOrdering extends Ordering[Person] {
-    def compare(a:Person, b:Person) =
+    def compare(a:Person, b:Person): Int =
       implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 5a734ec..ada07ef 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -70,7 +70,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("send-remotely", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case msg: String => message = msg
       }
     })
@@ -109,7 +109,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = env.setupEndpoint("ask-locally", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case msg: String => {
           context.reply(msg)
         }
@@ -123,7 +123,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("ask-remotely", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case msg: String => {
           context.reply(msg)
         }
@@ -146,7 +146,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("ask-timeout", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case msg: String => {
           Thread.sleep(100)
           context.reply(msg)
@@ -182,7 +182,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
         calledMethods += "start"
       }
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case msg: String =>
       }
 
@@ -206,7 +206,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
         throw new RuntimeException("Oops!")
       }
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case m =>
       }
 
@@ -225,7 +225,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("onError-onStop", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case m =>
       }
 
@@ -250,8 +250,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("onError-receive", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
-        case m =>  throw new RuntimeException("Oops!")
+      override def receive: PartialFunction[Any, Unit] = {
+        case m => throw new RuntimeException("Oops!")
       }
 
       override def onError(cause: Throwable): Unit = {
@@ -277,7 +277,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
         callSelfSuccessfully = true
       }
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case m =>
       }
     })
@@ -294,7 +294,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("self-receive", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case m => {
           self
           callSelfSuccessfully = true
@@ -316,7 +316,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("self-onStop", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case m =>
       }
 
@@ -343,7 +343,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
       val endpointRef = env.setupEndpoint(s"receive-in-sequence-$i", new ThreadSafeRpcEndpoint {
         override val rpcEnv = env
 
-        override def receive = {
+        override def receive: PartialFunction[Any, Unit] = {
           case m => result += 1
         }
 
@@ -372,7 +372,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("stop-reentrant", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case m =>
       }
 
@@ -394,7 +394,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("sendWithReply", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case m => context.reply("ack")
       }
     })
@@ -410,7 +410,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("sendWithReply-remotely", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case m => context.reply("ack")
       }
     })
@@ -432,7 +432,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val endpointRef = env.setupEndpoint("sendWithReply-error", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case m => context.sendFailure(new SparkException("Oops"))
       }
     })
@@ -450,7 +450,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("sendWithReply-remotely-error", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case msg: String => context.sendFailure(new SparkException("Oops"))
       }
     })
@@ -476,7 +476,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("network-events", new ThreadSafeRpcEndpoint {
       override val rpcEnv = env
 
-      override def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case "hello" =>
         case m => events += "receive" -> m
       }
@@ -519,7 +519,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint {
       override val rpcEnv = env
 
-      override def receiveAndReply(context: RpcCallContext) = {
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
         case msg: String => context.sendFailure(new UnserializableException)
       }
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/a1fe59da/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index eb759f0..3c52a8c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -57,20 +57,18 @@ class MyRDD(
     locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable {
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
     throw new RuntimeException("should not be reached")
-  override def getPartitions = (0 until numPartitions).map(i => new Partition {
-    override def index = i
+  override def getPartitions: Array[Partition] = (0 until numPartitions).map(i => new Partition {
+    override def index: Int = i
   }).toArray
   override def getPreferredLocations(split: Partition): Seq[String] =
-    if (locations.isDefinedAt(split.index))
-      locations(split.index)
-    else
-      Nil
+    if (locations.isDefinedAt(split.index)) locations(split.index) else Nil
   override def toString: String = "DAGSchedulerSuiteRDD " + id
 }
 
 class DAGSchedulerSuiteDummyException extends Exception
 
-class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSparkContext with Timeouts {
+class DAGSchedulerSuite
+  extends FunSuiteLike with BeforeAndAfter with LocalSparkContext with Timeouts {
 
   val conf = new SparkConf
   /** Set of TaskSets the DAGScheduler has requested executed. */
@@ -209,7 +207,8 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(taskSet.tasks.size >= results.size)
     for ((result, i) <- results.zipWithIndex) {
       if (i < taskSet.tasks.size) {
-        runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
+        runEvent(CompletionEvent(
+          taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo(), null))
       }
     }
   }
@@ -269,21 +268,23 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     submit(new MyRDD(sc, 1, Nil), Array(0))
     complete(taskSets(0), List((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("local job") {
     val rdd = new PairOfIntsRDD(sc, Nil) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         Array(42 -> 0).iterator
-      override def getPartitions = Array( new Partition { override def index = 0 } )
-      override def getPreferredLocations(split: Partition) = Nil
-      override def toString = "DAGSchedulerSuite Local RDD"
+      override def getPartitions: Array[Partition] =
+        Array( new Partition { override def index: Int = 0 } )
+      override def getPreferredLocations(split: Partition): List[String] = Nil
+      override def toString: String = "DAGSchedulerSuite Local RDD"
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
+    runEvent(
+      JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("local job oom") {
@@ -295,9 +296,10 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
       override def toString = "DAGSchedulerSuite Local RDD"
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
+    runEvent(
+      JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
     assert(results.size == 0)
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("run trivial job w/ dependency") {
@@ -306,7 +308,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     submit(finalRdd, Array(0))
     complete(taskSets(0), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("cache location preferences w/ dependency") {
@@ -319,7 +321,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
     complete(taskSet, Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("regression test for getCacheLocs") {
@@ -335,7 +337,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
   }
 
   test("avoid exponential blowup when getting preferred locs list") {
-    // Build up a complex dependency graph with repeated zip operations, without preferred locations.
+    // Build up a complex dependency graph with repeated zip operations, without preferred locations
     var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
     (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
     // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
@@ -357,7 +359,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("trivial job failure") {
@@ -367,7 +369,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("trivial job cancellation") {
@@ -378,7 +380,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("job cancellation no-kill backend") {
@@ -387,18 +389,20 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     val noKillTaskScheduler = new TaskScheduler() {
       override def rootPool: Pool = null
       override def schedulingMode: SchedulingMode = SchedulingMode.NONE
-      override def start() = {}
-      override def stop() = {}
-      override def submitTasks(taskSet: TaskSet) = {
+      override def start(): Unit = {}
+      override def stop(): Unit = {}
+      override def submitTasks(taskSet: TaskSet): Unit = {
         taskSets += taskSet
       }
       override def cancelTasks(stageId: Int, interruptThread: Boolean) {
         throw new UnsupportedOperationException
       }
-      override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
-      override def defaultParallelism() = 2
-      override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
-        blockManagerId: BlockManagerId): Boolean = true
+      override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
+      override def defaultParallelism(): Int = 2
+      override def executorHeartbeatReceived(
+          execId: String,
+          taskMetrics: Array[(Long, TaskMetrics)],
+          blockManagerId: BlockManagerId): Boolean = true
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
     }
     val noKillScheduler = new DAGScheduler(
@@ -422,7 +426,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     // When the task set completes normally, state should be correctly updated.
     complete(taskSets(0), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
 
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(sparkListener.failedStages.isEmpty)
@@ -442,7 +446,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
            Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
     complete(taskSets(1), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("run trivial shuffle with fetch failure") {
@@ -465,10 +469,11 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     // have the 2nd attempt pass
     complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
     // we can see both result blocks now
-    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
+    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
+      Array("hostA", "hostB"))
     complete(taskSets(3), Seq((Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("trivial shuffle with multiple fetch failures") {
@@ -521,19 +526,23 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(newEpoch > oldEpoch)
     val taskSet = taskSets(0)
     // should be ignored for being too old
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
     // should work because it's a non-failed host
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
     // should be ignored for being too old
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
     // should work because it's a new epoch
     taskSet.tasks(1).epoch = newEpoch
-    runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
            Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
     complete(taskSets(1), Seq((Success, 42), (Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("run shuffle with map stage failure") {
@@ -552,7 +561,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(sparkListener.failedStages.toSet === Set(0))
 
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   /**
@@ -586,7 +595,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     class FailureRecordingJobListener() extends JobListener {
       var failureMessage: String = _
       override def taskSucceeded(index: Int, result: Any) {}
-      override def jobFailed(exception: Exception) = { failureMessage = exception.getMessage }
+      override def jobFailed(exception: Exception): Unit = { failureMessage = exception.getMessage }
     }
     val listener1 = new FailureRecordingJobListener()
     val listener2 = new FailureRecordingJobListener()
@@ -606,7 +615,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
 
     assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
     assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("run trivial shuffle with out-of-band failure and retry") {
@@ -629,7 +638,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
            Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
     complete(taskSets(2), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("recursive shuffle failures") {
@@ -658,7 +667,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
     complete(taskSets(5), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("cached post-shuffle") {
@@ -690,7 +699,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
     complete(taskSets(4), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
@@ -742,7 +751,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
   }
 
   test("accumulator not calculated for resubmitted result stage") {
-    //just for register
+    // just for register
     val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
     val finalRdd = new MyRDD(sc, 1, Nil)
     submit(finalRdd, Array(0))
@@ -754,7 +763,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
 
     assert(accVal === 1)
 
-    assertDataStructuresEmpty
+    assertDataStructuresEmpty()
   }
 
   /**
@@ -774,7 +783,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
   private def makeBlockManagerId(host: String): BlockManagerId =
     BlockManagerId("exec-" + host, host, 12345)
 
-  private def assertDataStructuresEmpty = {
+  private def assertDataStructuresEmpty(): Unit = {
     assert(scheduler.activeJobs.isEmpty)
     assert(scheduler.failedStages.isEmpty)
     assert(scheduler.jobIdToActiveJob.isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org