You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 06:31:31 UTC

[1/6] Remove simple redundant return statement for Scala methods/functions:

Updated Branches:
  refs/heads/master 405bfe86e -> 0ab505a29


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index be323d7..efeee31 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -205,9 +205,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      return false
+      false
+    } else {
+      true
     }
-    return true
   }
 
   /** Copy the file into HDFS if needed. */


[5/6] git commit: Address code review concerns and comments.

Posted by pw...@apache.org.
Address code review concerns and comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5a8abfb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5a8abfb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5a8abfb7

Branch: refs/heads/master
Commit: 5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d
Parents: f1c5eca
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 19:15:09 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 19:15:09 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala   | 2 +-
 .../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 3 ++-
 core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala    | 6 +++---
 .../scala/org/apache/spark/storage/BlockManagerWorker.scala    | 6 +++---
 .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala   | 3 ++-
 .../test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala | 5 +----
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala   | 6 +++---
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala   | 6 +++---
 8 files changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index bba873a..4e63117 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -189,7 +189,7 @@ object SparkHadoopWriter {
     if (path == null) {
       throw new IllegalArgumentException("Output path is null")
     }
-    var outputPath = new Path(path)
+    val outputPath = new Path(path)
     val fs = outputPath.getFileSystem(conf)
     if (outputPath == null || fs == null) {
       throw new IllegalArgumentException("Incorrectly formatted output path")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8830de7..82527fe 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
-    new Iterator[Array[Byte]] {
+    val stdoutIterator = new Iterator[Array[Byte]] {
       def next(): Array[Byte] = {
         val obj = _nextObj
         if (hasNext) {
@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
       def hasNext = _nextObj.length != 0
     }
+    stdoutIterator
   }
 
   val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index a5394a2..cefcc3d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
 
     val prefPartActual = prefPart.get
 
-    if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
+    if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
       minPowerOfTwo  // prefer balance over locality
-    else {
+    } else {
       prefPartActual // prefer locality over balance
     }
   }
@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
    */
   def run(): Array[PartitionGroup] = {
     setupGroups(math.min(prev.partitions.length, maxPartitions))   // setup the groups (bins)
-    throwBalls()             // assign partitions (balls) to each group (bins)
+    throwBalls() // assign partitions (balls) to each group (bins)
     getPartitions
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index a36abe0..42f52d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
           Some(new BlockMessageArray(responseMessages).toBufferMessage)
         } catch {
           case e: Exception => logError("Exception handling buffer message", e)
-          return None
+          None
         }
       }
       case otherMessage: Any => {
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
     val blockMessageArray = new BlockMessageArray(blockMessage)
     val resultMessage = connectionManager.sendMessageReliablySync(
         toConnManagerId, blockMessageArray.toBufferMessage)
-    return (resultMessage != None)
+    resultMessage != None
   }
 
   def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging {
             return blockMessage.getData
           })
       }
-      case None => logDebug("No response message received"); return null
+      case None => logDebug("No response message received")
     }
     null
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 14f89d5..f0236ef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
         locations: Seq[Seq[String]] = Nil
       ): MyRDD = {
     val maxPartition = numPartitions - 1
-    new MyRDD(sc, dependencies) {
+    val newRDD = new MyRDD(sc, dependencies) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         throw new RuntimeException("should not be reached")
       override def getPartitions = (0 to maxPartition).map(i => new Partition {
@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
           Nil
       override def toString: String = "DAGSchedulerSuiteRDD " + id
     }
+    newRDD
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 3880e68..2910291 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
       def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage) 
     }
     type MyRDD = RDD[(Int, Int)]
-    def makeRdd(
-        numPartitions: Int,
-        dependencies: List[Dependency[_]]
-      ): MyRDD = {
+    def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
       val maxPartition = numPartitions - 1
       new MyRDD(sc, dependencies) {
         override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e1fe09e..e56bc02 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
+      return false
     }
+
+    true
   }
 
   /** Copy the file into HDFS if needed. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c084485..51d9adb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
+      return false
     }
+
+    true
   }
 
   /** Copy the file into HDFS if needed. */


[4/6] git commit: Fix accidental comment modification.

Posted by pw...@apache.org.
Fix accidental comment modification.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f1c5eca4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f1c5eca4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f1c5eca4

Branch: refs/heads/master
Commit: f1c5eca494f798fc22c46d245435381a89098fe4
Parents: 91a5636
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 10:40:21 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 10:40:21 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f1c5eca4/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 57bdf22..80a611b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
 private[spark] class MapOutputTrackerMaster(conf: SparkConf)
   extends MapOutputTracker(conf) {
 
-  // Cache a serialized version of the output statuses for each shuffle to send them out faster                          return
+  // Cache a serialized version of the output statuses for each shuffle to send them out faster
   private var cacheEpoch = epoch
   private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
 


[3/6] git commit: Merge branch 'master' into remove_simpleredundantreturn_scala

Posted by pw...@apache.org.
Merge branch 'master' into remove_simpleredundantreturn_scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/91a56360
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/91a56360
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/91a56360

Branch: refs/heads/master
Commit: 91a563608e301bb243fca3765d569bde65ad747c
Parents: 93a65e5 288a878
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 10:34:13 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 10:34:13 2014 -0800

----------------------------------------------------------------------
 bin/pyspark                                     |   7 +-
 conf/log4j.properties.template                  |   4 +-
 core/pom.xml                                    |  10 +
 .../org/apache/spark/log4j-defaults.properties  |   4 +-
 .../scala/org/apache/spark/Aggregator.scala     |  61 ++--
 .../scala/org/apache/spark/SparkContext.scala   | 122 ++++---
 .../main/scala/org/apache/spark/SparkEnv.scala  |   6 +-
 .../apache/spark/api/java/JavaDoubleRDD.scala   |  12 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |   6 +
 .../org/apache/spark/api/java/JavaRDD.scala     |   6 +
 .../org/apache/spark/api/java/JavaRDDLike.scala |   6 +
 .../spark/api/java/JavaSparkContext.scala       |  54 ++-
 .../scala/org/apache/spark/deploy/Client.scala  | 151 ++++++++
 .../apache/spark/deploy/ClientArguments.scala   | 117 +++++++
 .../org/apache/spark/deploy/DeployMessage.scala |  52 ++-
 .../apache/spark/deploy/DriverDescription.scala |  29 ++
 .../apache/spark/deploy/client/AppClient.scala  | 201 +++++++++++
 .../spark/deploy/client/AppClientListener.scala |  39 +++
 .../org/apache/spark/deploy/client/Client.scala | 200 -----------
 .../spark/deploy/client/ClientListener.scala    |  39 ---
 .../apache/spark/deploy/client/TestClient.scala |   4 +-
 .../apache/spark/deploy/master/DriverInfo.scala |  36 ++
 .../spark/deploy/master/DriverState.scala       |  33 ++
 .../master/FileSystemPersistenceEngine.scala    |  17 +-
 .../org/apache/spark/deploy/master/Master.scala | 189 +++++++++-
 .../spark/deploy/master/PersistenceEngine.scala |  11 +-
 .../apache/spark/deploy/master/WorkerInfo.scala |  20 +-
 .../master/ZooKeeperPersistenceEngine.scala     |  14 +-
 .../deploy/master/ui/ApplicationPage.scala      |   4 +-
 .../spark/deploy/master/ui/IndexPage.scala      |  56 ++-
 .../spark/deploy/worker/CommandUtils.scala      |  63 ++++
 .../spark/deploy/worker/DriverRunner.scala      | 234 +++++++++++++
 .../spark/deploy/worker/DriverWrapper.scala     |  31 ++
 .../spark/deploy/worker/ExecutorRunner.scala    |  67 +---
 .../org/apache/spark/deploy/worker/Worker.scala |  63 +++-
 .../spark/deploy/worker/WorkerWatcher.scala     |  55 +++
 .../spark/deploy/worker/ui/IndexPage.scala      |  65 +++-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  43 ++-
 .../executor/CoarseGrainedExecutorBackend.scala |  27 +-
 .../org/apache/spark/executor/Executor.scala    |   5 +
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  88 +++--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  37 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  36 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   7 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |  10 +-
 .../org/apache/spark/storage/BlockId.scala      |  12 +-
 .../org/apache/spark/storage/BlockManager.scala |   4 +-
 .../spark/storage/BlockManagerMasterActor.scala |   7 +-
 .../spark/storage/BlockObjectWriter.scala       |   4 +
 .../apache/spark/storage/DiskBlockManager.scala |  11 +-
 .../spark/storage/ShuffleBlockManager.scala     |   2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   2 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |   2 +-
 .../org/apache/spark/util/AppendOnlyMap.scala   | 237 -------------
 .../apache/spark/util/TimeStampedHashMap.scala  |  17 +-
 .../scala/org/apache/spark/util/Utils.scala     |  28 +-
 .../scala/org/apache/spark/util/Vector.scala    |   8 +
 .../spark/util/collection/AppendOnlyMap.scala   | 297 ++++++++++++++++
 .../util/collection/ExternalAppendOnlyMap.scala | 350 +++++++++++++++++++
 .../collection/SizeTrackingAppendOnlyMap.scala  | 101 ++++++
 .../apache/spark/deploy/JsonProtocolSuite.scala |  40 ++-
 .../spark/deploy/worker/DriverRunnerTest.scala  | 131 +++++++
 .../deploy/worker/ExecutorRunnerTest.scala      |   4 +-
 .../deploy/worker/WorkerWatcherSuite.scala      |  32 ++
 .../apache/spark/util/AppendOnlyMapSuite.scala  | 154 --------
 .../util/SizeTrackingAppendOnlyMapSuite.scala   | 120 +++++++
 .../org/apache/spark/util/VectorSuite.scala     |  44 +++
 .../util/collection/AppendOnlyMapSuite.scala    | 198 +++++++++++
 .../collection/ExternalAppendOnlyMapSuite.scala | 230 ++++++++++++
 docs/configuration.md                           |  25 +-
 docs/python-programming-guide.md                |   5 +-
 docs/running-on-yarn.md                         |  15 +-
 docs/spark-standalone.md                        |  38 +-
 ec2/spark_ec2.py                                |  12 +-
 .../streaming/examples/JavaFlumeEventCount.java |   2 +
 .../streaming/examples/JavaKafkaWordCount.java  |   2 +
 .../examples/JavaNetworkWordCount.java          |  11 +-
 .../streaming/examples/JavaQueueStream.java     |   2 +
 .../spark/examples/DriverSubmissionTest.scala   |  46 +++
 .../streaming/examples/ActorWordCount.scala     |  12 +-
 .../streaming/examples/FlumeEventCount.scala    |   4 +-
 .../streaming/examples/HdfsWordCount.scala      |   3 +-
 .../streaming/examples/KafkaWordCount.scala     |   5 +-
 .../streaming/examples/MQTTWordCount.scala      |   8 +-
 .../streaming/examples/NetworkWordCount.scala   |   7 +-
 .../spark/streaming/examples/QueueStream.scala  |   8 +-
 .../streaming/examples/RawNetworkGrep.scala     |   5 +-
 .../examples/RecoverableNetworkWordCount.scala  | 118 +++++++
 .../examples/StatefulNetworkWordCount.scala     |   2 +
 .../streaming/examples/StreamingExamples.scala  |  21 ++
 .../streaming/examples/TwitterAlgebirdCMS.scala |  10 +-
 .../streaming/examples/TwitterAlgebirdHLL.scala |   9 +-
 .../streaming/examples/TwitterPopularTags.scala |   2 +
 .../streaming/examples/ZeroMQWordCount.scala    |   1 +
 .../examples/clickstream/PageViewStream.scala   |   4 +-
 external/kafka/pom.xml                          |   4 +-
 .../spark/streaming/mqtt/MQTTInputDStream.scala |   2 +-
 .../apache/spark/mllib/recommendation/ALS.scala |  15 +-
 pom.xml                                         |  17 +
 project/SparkBuild.scala                        |  19 +-
 .../org/apache/spark/streaming/Checkpoint.scala | 188 ++++++----
 .../org/apache/spark/streaming/DStream.scala    |  17 +-
 .../spark/streaming/DStreamCheckpointData.scala | 106 +++---
 .../apache/spark/streaming/DStreamGraph.scala   |  38 +-
 .../spark/streaming/StreamingContext.scala      |  75 +++-
 .../api/java/JavaStreamingContext.scala         |  96 ++++-
 .../streaming/dstream/FileInputDStream.scala    |  42 ++-
 .../streaming/scheduler/JobGenerator.scala      |  31 +-
 .../streaming/util/MasterFailureTest.scala      |  55 +--
 .../apache/spark/streaming/JavaAPISuite.java    |  29 +-
 .../spark/streaming/CheckpointSuite.scala       |  10 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  13 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      |  28 +-
 .../cluster/YarnClientSchedulerBackend.scala    |  50 +--
 .../spark/deploy/yarn/ApplicationMaster.scala   |  13 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   1 +
 .../spark/deploy/yarn/WorkerLauncher.scala      |  28 +-
 118 files changed, 4403 insertions(+), 1231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 0000000,d98c7aa..b8c852b
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@@ -1,0 -1,297 +1,297 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.util.collection
+ 
+ import java.util.{Arrays, Comparator}
+ 
+ /**
+  * A simple open hash table optimized for the append-only use case, where keys
+  * are never removed, but the value for each key may be changed.
+  *
+  * This implementation uses quadratic probing with a power-of-2 hash table
+  * size, which is guaranteed to explore all spaces for each key (see
+  * http://en.wikipedia.org/wiki/Quadratic_probing).
+  *
+  * TODO: Cache the hash values of each key? java.util.HashMap does that.
+  */
+ private[spark]
+ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
+   V)] with Serializable {
+   require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+   require(initialCapacity >= 1, "Invalid initial capacity")
+ 
+   private var capacity = nextPowerOf2(initialCapacity)
+   private var mask = capacity - 1
+   private var curSize = 0
+   private var growThreshold = (LOAD_FACTOR * capacity).toInt
+ 
+   // Holds keys and values in the same array for memory locality; specifically, the order of
+   // elements is key0, value0, key1, value1, key2, value2, etc.
+   private var data = new Array[AnyRef](2 * capacity)
+ 
+   // Treat the null key differently so we can use nulls in "data" to represent empty items.
+   private var haveNullValue = false
+   private var nullValue: V = null.asInstanceOf[V]
+ 
+   // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
+   private var destroyed = false
+   private val destructionMessage = "Map state is invalid from destructive sorting!"
+ 
+   private val LOAD_FACTOR = 0.7
+ 
+   /** Get the value for a given key */
+   def apply(key: K): V = {
+     assert(!destroyed, destructionMessage)
+     val k = key.asInstanceOf[AnyRef]
+     if (k.eq(null)) {
+       return nullValue
+     }
+     var pos = rehash(k.hashCode) & mask
+     var i = 1
+     while (true) {
+       val curKey = data(2 * pos)
+       if (k.eq(curKey) || k.equals(curKey)) {
+         return data(2 * pos + 1).asInstanceOf[V]
+       } else if (curKey.eq(null)) {
+         return null.asInstanceOf[V]
+       } else {
+         val delta = i
+         pos = (pos + delta) & mask
+         i += 1
+       }
+     }
 -    return null.asInstanceOf[V]
++    null.asInstanceOf[V]
+   }
+ 
+   /** Set the value for a key */
+   def update(key: K, value: V): Unit = {
+     assert(!destroyed, destructionMessage)
+     val k = key.asInstanceOf[AnyRef]
+     if (k.eq(null)) {
+       if (!haveNullValue) {
+         incrementSize()
+       }
+       nullValue = value
+       haveNullValue = true
+       return
+     }
+     var pos = rehash(key.hashCode) & mask
+     var i = 1
+     while (true) {
+       val curKey = data(2 * pos)
+       if (curKey.eq(null)) {
+         data(2 * pos) = k
+         data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+         incrementSize()  // Since we added a new key
+         return
+       } else if (k.eq(curKey) || k.equals(curKey)) {
+         data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+         return
+       } else {
+         val delta = i
+         pos = (pos + delta) & mask
+         i += 1
+       }
+     }
+   }
+ 
+   /**
+    * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
+    * for key, if any, or null otherwise. Returns the newly updated value.
+    */
+   def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
+     assert(!destroyed, destructionMessage)
+     val k = key.asInstanceOf[AnyRef]
+     if (k.eq(null)) {
+       if (!haveNullValue) {
+         incrementSize()
+       }
+       nullValue = updateFunc(haveNullValue, nullValue)
+       haveNullValue = true
+       return nullValue
+     }
+     var pos = rehash(k.hashCode) & mask
+     var i = 1
+     while (true) {
+       val curKey = data(2 * pos)
+       if (k.eq(curKey) || k.equals(curKey)) {
+         val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
+         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
+         return newValue
+       } else if (curKey.eq(null)) {
+         val newValue = updateFunc(false, null.asInstanceOf[V])
+         data(2 * pos) = k
+         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
+         incrementSize()
+         return newValue
+       } else {
+         val delta = i
+         pos = (pos + delta) & mask
+         i += 1
+       }
+     }
+     null.asInstanceOf[V] // Never reached but needed to keep compiler happy
+   }
+ 
+   /** Iterator method from Iterable */
+   override def iterator: Iterator[(K, V)] = {
+     assert(!destroyed, destructionMessage)
+     new Iterator[(K, V)] {
+       var pos = -1
+ 
+       /** Get the next value we should return from next(), or null if we're finished iterating */
+       def nextValue(): (K, V) = {
+         if (pos == -1) {    // Treat position -1 as looking at the null value
+           if (haveNullValue) {
+             return (null.asInstanceOf[K], nullValue)
+           }
+           pos += 1
+         }
+         while (pos < capacity) {
+           if (!data(2 * pos).eq(null)) {
+             return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+           }
+           pos += 1
+         }
+         null
+       }
+ 
+       override def hasNext: Boolean = nextValue() != null
+ 
+       override def next(): (K, V) = {
+         val value = nextValue()
+         if (value == null) {
+           throw new NoSuchElementException("End of iterator")
+         }
+         pos += 1
+         value
+       }
+     }
+   }
+ 
+   override def size: Int = curSize
+ 
+   /** Increase table size by 1, rehashing if necessary */
+   private def incrementSize() {
+     curSize += 1
+     if (curSize > growThreshold) {
+       growTable()
+     }
+   }
+ 
+   /**
+    * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+    * We use the Murmur Hash 3 finalization step that's also used in fastutil.
+    */
+   private def rehash(h: Int): Int = {
+     it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
+   }
+ 
+   /** Double the table's size and re-hash everything */
+   protected def growTable() {
+     val newCapacity = capacity * 2
+     if (newCapacity >= (1 << 30)) {
+       // We can't make the table this big because we want an array of 2x
+       // that size for our data, but array sizes are at most Int.MaxValue
+       throw new Exception("Can't make capacity bigger than 2^29 elements")
+     }
+     val newData = new Array[AnyRef](2 * newCapacity)
+     val newMask = newCapacity - 1
+     // Insert all our old values into the new array. Note that because our old keys are
+     // unique, there's no need to check for equality here when we insert.
+     var oldPos = 0
+     while (oldPos < capacity) {
+       if (!data(2 * oldPos).eq(null)) {
+         val key = data(2 * oldPos)
+         val value = data(2 * oldPos + 1)
+         var newPos = rehash(key.hashCode) & newMask
+         var i = 1
+         var keepGoing = true
+         while (keepGoing) {
+           val curKey = newData(2 * newPos)
+           if (curKey.eq(null)) {
+             newData(2 * newPos) = key
+             newData(2 * newPos + 1) = value
+             keepGoing = false
+           } else {
+             val delta = i
+             newPos = (newPos + delta) & newMask
+             i += 1
+           }
+         }
+       }
+       oldPos += 1
+     }
+     data = newData
+     capacity = newCapacity
+     mask = newMask
+     growThreshold = (LOAD_FACTOR * newCapacity).toInt
+   }
+ 
+   private def nextPowerOf2(n: Int): Int = {
+     val highBit = Integer.highestOneBit(n)
+     if (highBit == n) n else highBit << 1
+   }
+ 
+   /**
+    * Return an iterator of the map in sorted order. This provides a way to sort the map without
+    * using additional memory, at the expense of destroying the validity of the map.
+    */
+   def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
+     destroyed = true
+     // Pack KV pairs into the front of the underlying array
+     var keyIndex, newIndex = 0
+     while (keyIndex < capacity) {
+       if (data(2 * keyIndex) != null) {
+         data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+         newIndex += 1
+       }
+       keyIndex += 1
+     }
+     assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
+ 
+     // Sort by the given ordering
+     val rawOrdering = new Comparator[AnyRef] {
+       def compare(x: AnyRef, y: AnyRef): Int = {
+         cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+       }
+     }
+     Arrays.sort(data, 0, newIndex, rawOrdering)
+ 
+     new Iterator[(K, V)] {
+       var i = 0
+       var nullValueReady = haveNullValue
+       def hasNext: Boolean = (i < newIndex || nullValueReady)
+       def next(): (K, V) = {
+         if (nullValueReady) {
+           nullValueReady = false
+           (null.asInstanceOf[K], nullValue)
+         } else {
+           val item = data(i).asInstanceOf[(K, V)]
+           i += 1
+           item
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Return whether the next insert will cause the map to grow
+    */
+   def atGrowThreshold: Boolean = curSize == growThreshold
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------


[6/6] git commit: Merge pull request #395 from hsaputra/remove_simpleredundantreturn_scala

Posted by pw...@apache.org.
Merge pull request #395 from hsaputra/remove_simpleredundantreturn_scala

Remove simple redundant return statements for Scala methods/functions

Remove simple redundant return statements for Scala methods/functions:

-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
-) Add small changes to making var to val if possible and remove () for simple get

This hopefully makes the review simpler =)

Pass compile and tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0ab505a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0ab505a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0ab505a2

Branch: refs/heads/master
Commit: 0ab505a29e21b5a03928e0bbd3950f6f8e08ae32
Parents: 405bfe8 5a8abfb
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Jan 12 21:31:04 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Jan 12 21:31:04 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  2 +-
 .../scala/org/apache/spark/CacheManager.scala   |  4 +--
 .../scala/org/apache/spark/HttpFileServer.scala |  6 ++--
 .../main/scala/org/apache/spark/Logging.scala   |  2 +-
 .../org/apache/spark/MapOutputTracker.scala     |  2 +-
 .../scala/org/apache/spark/Partitioner.scala    |  4 +--
 .../scala/org/apache/spark/SparkContext.scala   | 11 +++++---
 .../org/apache/spark/SparkHadoopWriter.scala    | 15 +++++-----
 .../org/apache/spark/api/python/PythonRDD.scala |  3 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  6 ++--
 .../apache/spark/network/BufferMessage.scala    |  2 +-
 .../org/apache/spark/network/Connection.scala   |  6 ++--
 .../org/apache/spark/network/Message.scala      |  6 ++--
 .../spark/network/netty/ShuffleSender.scala     |  2 +-
 .../org/apache/spark/rdd/CoalescedRDD.scala     | 10 +++----
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  8 +++---
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  2 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  4 +--
 .../spark/scheduler/InputFormatInfo.scala       |  8 +++---
 .../scala/org/apache/spark/scheduler/Pool.scala |  8 +++---
 .../spark/scheduler/SchedulingAlgorithm.scala   | 11 ++++----
 .../spark/scheduler/SparkListenerBus.scala      |  2 +-
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../org/apache/spark/scheduler/TaskResult.scala |  2 +-
 .../spark/scheduler/TaskResultGetter.scala      |  2 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 14 +++++-----
 .../mesos/CoarseMesosSchedulerBackend.scala     |  2 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  6 ++--
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../spark/storage/BlockManagerWorker.scala      | 20 +++++++-------
 .../org/apache/spark/storage/BlockMessage.scala |  2 +-
 .../spark/storage/BlockMessageArray.scala       |  2 +-
 .../org/apache/spark/storage/MemoryStore.scala  |  2 +-
 .../org/apache/spark/storage/StorageLevel.scala |  2 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 10 +++----
 .../org/apache/spark/util/SizeEstimator.scala   | 10 +++----
 .../scala/org/apache/spark/util/Utils.scala     | 19 ++++++-------
 .../scala/org/apache/spark/util/Vector.scala    | 12 ++++----
 .../spark/util/collection/AppendOnlyMap.scala   |  2 +-
 .../spark/scheduler/ClusterSchedulerSuite.scala |  9 +++---
 .../spark/scheduler/DAGSchedulerSuite.scala     |  3 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |  7 ++---
 .../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++-----
 .../org/apache/spark/examples/LocalALS.scala    |  8 +++---
 .../org/apache/spark/examples/LocalFileLR.scala |  2 +-
 .../org/apache/spark/examples/LocalKMeans.scala |  2 +-
 .../org/apache/spark/examples/SparkALS.scala    |  6 ++--
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 .../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----
 .../clickstream/PageViewGenerator.scala         |  2 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 29 ++++++++++----------
 .../org/apache/spark/streaming/Checkpoint.scala |  2 +-
 .../streaming/dstream/FileInputDStream.scala    |  2 +-
 .../spark/streaming/dstream/StateDStream.scala  |  8 +++---
 .../scheduler/StreamingListenerBus.scala        |  2 +-
 .../org/apache/spark/streaming/util/Clock.scala |  4 +--
 .../spark/streaming/util/RawTextHelper.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  7 +++--
 .../spark/deploy/yarn/WorkerLauncher.scala      |  8 +++---
 .../spark/deploy/yarn/WorkerRunnable.scala      |  7 ++---
 .../yarn/ClientDistributedCacheManager.scala    | 10 +++----
 .../ClientDistributedCacheManagerSuite.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  3 +-
 63 files changed, 194 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ab505a2/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fc63788,e551c11..17b1328
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@@ -93,8 -88,6 +93,8 @@@ private[spark] class SparkListenerBus e
         * add overhead in the general case. */
        Thread.sleep(10)
      }
-     return true
+     true
    }
 +
 +  def stop(): Unit = post(SparkListenerShutdown)
  }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ab505a2/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ab505a2/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 6e6e22e,73dc520..3063cf1
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@@ -83,8 -76,6 +83,8 @@@ private[spark] class StreamingListenerB
         * add overhead in the general case. */
        Thread.sleep(10)
      }
-     return true
+     true
    }
 +
 +  def stop(): Unit = post(StreamingListenerShutdown)
  }


[2/6] git commit: Remove simple redundant return statement for Scala methods/functions:

Posted by pw...@apache.org.
Remove simple redundant return statement for Scala methods/functions:

-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/93a65e5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/93a65e5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/93a65e5f

Branch: refs/heads/master
Commit: 93a65e5fde64ffed3dbd2a050c1007e077ecd004
Parents: 26cdb5f
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 10:30:04 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 10:30:04 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  2 +-
 .../scala/org/apache/spark/CacheManager.scala   |  4 +--
 .../scala/org/apache/spark/HttpFileServer.scala |  6 ++--
 .../main/scala/org/apache/spark/Logging.scala   |  2 +-
 .../org/apache/spark/MapOutputTracker.scala     |  4 +--
 .../scala/org/apache/spark/Partitioner.scala    |  4 +--
 .../scala/org/apache/spark/SparkContext.scala   | 11 +++++---
 .../org/apache/spark/SparkHadoopWriter.scala    | 13 ++++-----
 .../org/apache/spark/api/python/PythonRDD.scala |  2 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  6 ++--
 .../apache/spark/network/BufferMessage.scala    |  2 +-
 .../org/apache/spark/network/Connection.scala   |  6 ++--
 .../org/apache/spark/network/Message.scala      |  6 ++--
 .../spark/network/netty/ShuffleSender.scala     |  2 +-
 .../org/apache/spark/rdd/CoalescedRDD.scala     |  4 +--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  8 +++---
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  2 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  4 +--
 .../spark/scheduler/InputFormatInfo.scala       |  8 +++---
 .../scala/org/apache/spark/scheduler/Pool.scala |  8 +++---
 .../spark/scheduler/SchedulingAlgorithm.scala   | 11 ++++----
 .../spark/scheduler/SparkListenerBus.scala      |  2 +-
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../org/apache/spark/scheduler/TaskResult.scala |  2 +-
 .../spark/scheduler/TaskResultGetter.scala      |  2 +-
 .../apache/spark/scheduler/TaskSetManager.scala | 14 +++++-----
 .../mesos/CoarseMesosSchedulerBackend.scala     |  2 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  6 ++--
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../spark/storage/BlockManagerWorker.scala      | 14 +++++-----
 .../org/apache/spark/storage/BlockMessage.scala |  2 +-
 .../spark/storage/BlockMessageArray.scala       |  2 +-
 .../org/apache/spark/storage/MemoryStore.scala  |  2 +-
 .../org/apache/spark/storage/StorageLevel.scala |  2 +-
 .../org/apache/spark/util/AppendOnlyMap.scala   |  2 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 10 +++----
 .../org/apache/spark/util/SizeEstimator.scala   | 10 +++----
 .../scala/org/apache/spark/util/Utils.scala     | 19 ++++++-------
 .../scala/org/apache/spark/util/Vector.scala    | 12 ++++----
 .../spark/scheduler/ClusterSchedulerSuite.scala |  9 +++---
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |  2 +-
 .../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++-----
 .../org/apache/spark/examples/LocalALS.scala    |  8 +++---
 .../org/apache/spark/examples/LocalFileLR.scala |  2 +-
 .../org/apache/spark/examples/LocalKMeans.scala |  2 +-
 .../org/apache/spark/examples/SparkALS.scala    |  6 ++--
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 .../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----
 .../clickstream/PageViewGenerator.scala         |  2 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 29 ++++++++++----------
 .../org/apache/spark/streaming/Checkpoint.scala |  2 +-
 .../streaming/dstream/FileInputDStream.scala    |  2 +-
 .../spark/streaming/dstream/StateDStream.scala  |  8 +++---
 .../scheduler/StreamingListenerBus.scala        |  2 +-
 .../org/apache/spark/streaming/util/Clock.scala |  4 +--
 .../spark/streaming/util/RawTextHelper.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  9 +++---
 .../spark/deploy/yarn/WorkerLauncher.scala      |  8 +++---
 .../spark/deploy/yarn/WorkerRunnable.scala      |  7 ++---
 .../yarn/ClientDistributedCacheManager.scala    | 10 +++----
 .../ClientDistributedCacheManagerSuite.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  5 ++--
 63 files changed, 187 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 5f73d23..e89ac28 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -218,7 +218,7 @@ private object Accumulators {
 
   def newId: Long = synchronized {
     lastId += 1
-    return lastId
+    lastId
   }
 
   def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 519ecde..8e5dd8a 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
     blockManager.get(key) match {
       case Some(values) =>
         // Partition is already materialized, so just return its values
-        return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+        new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
 
       case None =>
         // Mark the split as loading (unless someone else marks it first)
@@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           val elements = new ArrayBuffer[Any]
           elements ++= computedValues
           blockManager.put(key, elements, storageLevel, tellMaster = true)
-          return elements.iterator.asInstanceOf[Iterator[T]]
+          elements.iterator.asInstanceOf[Iterator[T]]
         } finally {
           loading.synchronized {
             loading.remove(key)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index ad1ee20..a885898 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging {
   
   def addFile(file: File) : String = {
     addFileToDir(file, fileDir)
-    return serverUri + "/files/" + file.getName
+    serverUri + "/files/" + file.getName
   }
   
   def addJar(file: File) : String = {
     addFileToDir(file, jarDir)
-    return serverUri + "/jars/" + file.getName
+    serverUri + "/jars/" + file.getName
   }
   
   def addFileToDir(file: File, dir: File) : String = {
     Files.copy(file, new File(dir, file.getName))
-    return dir + "/" + file.getName
+    dir + "/" + file.getName
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 4a34989..9063cae 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -41,7 +41,7 @@ trait Logging {
       }
       log_ = LoggerFactory.getLogger(className)
     }
-    return log_
+    log_
   }
 
   // Log methods that take only a String

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 77b8ca1..57bdf22 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
           return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
         }
       }
-      else{
+      else {
         throw new FetchFailedException(null, shuffleId, -1, reduceId,
           new Exception("Missing all output locations for shuffle " + shuffleId))
       }
@@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
 private[spark] class MapOutputTrackerMaster(conf: SparkConf)
   extends MapOutputTracker(conf) {
 
-  // Cache a serialized version of the output statuses for each shuffle to send them out faster
+  // Cache a serialized version of the output statuses for each shuffle to send them out faster                          return
   private var cacheEpoch = epoch
   private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 9b043f0..fc0a749 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -53,9 +53,9 @@ object Partitioner {
       return r.partitioner.get
     }
     if (rdd.context.conf.contains("spark.default.parallelism")) {
-      return new HashPartitioner(rdd.context.defaultParallelism)
+      new HashPartitioner(rdd.context.defaultParallelism)
     } else {
-      return new HashPartitioner(bySize.head.partitions.size)
+      new HashPartitioner(bySize.head.partitions.size)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f91392b..3d82bfc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -756,8 +756,11 @@ class SparkContext(
 
   private[spark] def getCallSite(): String = {
     val callSite = getLocalProperty("externalCallSite")
-    if (callSite == null) return Utils.formatSparkCallSite
-    callSite
+    if (callSite == null) {
+      Utils.formatSparkCallSite
+    } else {
+      callSite
+    }
   }
 
   /**
@@ -907,7 +910,7 @@ class SparkContext(
    */
   private[spark] def clean[F <: AnyRef](f: F): F = {
     ClosureCleaner.clean(f)
-    return f
+    f
   }
 
   /**
@@ -919,7 +922,7 @@ class SparkContext(
       val path = new Path(dir, UUID.randomUUID().toString)
       val fs = path.getFileSystem(hadoopConfiguration)
       fs.mkdirs(path)
-      fs.getFileStatus(path).getPath().toString
+      fs.getFileStatus(path).getPath.toString
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 618d950..bba873a 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
       format = conf.value.getOutputFormat()
         .asInstanceOf[OutputFormat[AnyRef,AnyRef]]
     }
-    return format 
+    format
   }
 
   private def getOutputCommitter(): OutputCommitter = {
     if (committer == null) {
       committer = conf.value.getOutputCommitter
     }
-    return committer
+    committer
   }
 
   private def getJobContext(): JobContext = {
     if (jobContext == null) { 
       jobContext = newJobContext(conf.value, jID.value)
     }
-    return jobContext
+    jobContext
   }
 
   private def getTaskContext(): TaskAttemptContext = {
     if (taskContext == null) {
       taskContext =  newTaskAttemptContext(conf.value, taID.value)
     }
-    return taskContext
+    taskContext
   }
 
   private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
@@ -182,7 +182,7 @@ object SparkHadoopWriter {
   def createJobID(time: Date, id: Int): JobID = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
-    return new JobID(jobtrackerID, id)
+    new JobID(jobtrackerID, id)
   }
   
   def createPathFromString(path: String, conf: JobConf): Path = {
@@ -194,7 +194,6 @@ object SparkHadoopWriter {
     if (outputPath == null || fs == null) {
       throw new IllegalArgumentException("Incorrectly formatted output path")
     }
-    outputPath = outputPath.makeQualified(fs)
-    return outputPath
+    outputPath.makeQualified(fs)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 40c519b..8830de7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
 
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
-    return new Iterator[Array[Byte]] {
+    new Iterator[Array[Byte]] {
       def next(): Array[Byte] = {
         val obj = _nextObj
         if (hasNext) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index fdf92ec..1d295c6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -203,16 +203,16 @@ extends Logging {
     }
     bais.close()
 
-    var tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
+    val tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
     tInfo.hasBlocks = blockNum
 
-    return tInfo
+    tInfo
   }
 
   def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
                             totalBytes: Int,
                             totalBlocks: Int): T = {
-    var retByteArray = new Array[Byte](totalBytes)
+    val retByteArray = new Array[Byte](totalBytes)
     for (i <- 0 until totalBlocks) {
       System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
         i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
index f736bb3..fb4c659 100644
--- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
@@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
       throw new Exception("Max chunk size is " + maxChunkSize)
     }
 
-    if (size == 0 && gotChunkForSendingOnce == false) {
+    if (size == 0 && !gotChunkForSendingOnce) {
       val newChunk = new MessageChunk(
         new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null)
       gotChunkForSendingOnce = true

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 95cb020..cba8477 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
       // Is highly unlikely unless there was an unclean close of socket, etc
       registerInterest()
       logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
-      return true
+      true
     } catch {
       case e: Exception => {
         logWarning("Error finishing connection to " + address, e)
@@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
       }
     }
     // should not happen - to keep scala compiler happy
-    return true
+    true
   }
 
   // This is a hack to determine if remote socket was closed or not.
@@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
       }
     }
     // should not happen - to keep scala compiler happy
-    return true
+    true
   }
 
   def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala
index f2ecc6d..2612884 100644
--- a/core/src/main/scala/org/apache/spark/network/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/Message.scala
@@ -61,7 +61,7 @@ private[spark] object Message {
     if (dataBuffers.exists(_ == null)) {
       throw new Exception("Attempting to create buffer message with null buffer")
     }
-    return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
+    new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
   }
 
   def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage =
@@ -69,9 +69,9 @@ private[spark] object Message {
 
   def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = {
     if (dataBuffer == null) {
-      return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
+      createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
     } else {
-      return createBufferMessage(Array(dataBuffer), ackId)
+      createBufferMessage(Array(dataBuffer), ackId)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 546d921..44204a8 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
         val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
         val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
         val file = new File(subDir, blockId.name)
-        return new FileSegment(file, 0, file.length())
+        new FileSegment(file, 0, file.length())
       }
     }
     val sender = new ShuffleSender(port, pResovler)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 98da357..a5394a2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -296,9 +296,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
     val prefPartActual = prefPart.get
 
     if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
-      return minPowerOfTwo  // prefer balance over locality
+      minPowerOfTwo  // prefer balance over locality
     else {
-      return prefPartActual // prefer locality over balance
+      prefPartActual // prefer locality over balance
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 53f77a3..20db7db 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -99,11 +99,11 @@ class HadoopRDD[K, V](
     val conf: Configuration = broadcastedConf.value.value
     if (conf.isInstanceOf[JobConf]) {
       // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
-      return conf.asInstanceOf[JobConf]
+      conf.asInstanceOf[JobConf]
     } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
       // getJobConf() has been called previously, so there is already a local cache of the JobConf
       // needed by this RDD.
-      return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+      HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
     } else {
       // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
       // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
@@ -111,7 +111,7 @@ class HadoopRDD[K, V](
       val newJobConf = new JobConf(broadcastedConf.value.value)
       initLocalJobConfFuncOpt.map(f => f(newJobConf))
       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
-      return newJobConf
+      newJobConf
     }
   }
 
@@ -127,7 +127,7 @@ class HadoopRDD[K, V](
       newInputFormat.asInstanceOf[Configurable].setConf(conf)
     }
     HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
-    return newInputFormat
+    newInputFormat
   }
 
   override def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 1dbbe39..d4f396a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag](
 
     // Return an iterator that read lines from the process's stdout
     val lines = Source.fromInputStream(proc.getInputStream).getLines
-    return new Iterator[String] {
+    new Iterator[String] {
       def next() = lines.next()
       def hasNext = {
         if (lines.hasNext) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f9dc12e..edd4f38 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag](
         val entry = iter.next()
         m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
       }
-      return m1
+      m1
     }
     val myResult = mapPartitions(countPartition).reduce(mergeMaps)
     myResult.asInstanceOf[java.util.Map[T, Long]]   // Will be wrapped as a Scala mutable Map
@@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag](
       partsScanned += numPartsToTry
     }
 
-    return buf.toArray
+    buf.toArray
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 90eb8a7..cc10cc0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
       retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
     }
 
-    return retval.toSet
+    retval.toSet
   }
 
   // This method does not expect failures, since validate has already passed ...
@@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
         elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
     )
 
-    return retval.toSet
+    retval.toSet
    }
 
   private def findPreferredLocations(): Set[SplitInfo] = {
     logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
       ", inputFormatClazz : " + inputFormatClazz)
     if (mapreduceInputFormat) {
-      return prefLocsFromMapreduceInputFormat()
+      prefLocsFromMapreduceInputFormat()
     }
     else {
       assert(mapredInputFormat)
-      return prefLocsFromMapredInputFormat()
+      prefLocsFromMapredInputFormat()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 1791242..4bc13c2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -75,12 +75,12 @@ private[spark] class Pool(
       return schedulableNameToSchedulable(schedulableName)
     }
     for (schedulable <- schedulableQueue) {
-      var sched = schedulable.getSchedulableByName(schedulableName)
+      val sched = schedulable.getSchedulableByName(schedulableName)
       if (sched != null) {
         return sched
       }
     }
-    return null
+    null
   }
 
   override def executorLost(executorId: String, host: String) {
@@ -92,7 +92,7 @@ private[spark] class Pool(
     for (schedulable <- schedulableQueue) {
       shouldRevive |= schedulable.checkSpeculatableTasks()
     }
-    return shouldRevive
+    shouldRevive
   }
 
   override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
@@ -101,7 +101,7 @@ private[spark] class Pool(
     for (schedulable <- sortedSchedulableQueue) {
       sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
     }
-    return sortedTaskSetQueue
+    sortedTaskSetQueue
   }
 
   def increaseRunningTasks(taskNum: Int) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index 3418640..5e62c84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -37,9 +37,9 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
       res = math.signum(stageId1 - stageId2)
     }
     if (res < 0) {
-      return true
+      true
     } else {
-      return false
+      false
     }
   }
 }
@@ -56,7 +56,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
     val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
     val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
     val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
-    var res:Boolean = true
     var compare:Int = 0
 
     if (s1Needy && !s2Needy) {
@@ -70,11 +69,11 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
     }
 
     if (compare < 0) {
-      return true
+      true
     } else if (compare > 0) {
-      return false
+      false
     } else {
-      return s1.name < s2.name
+      s1.name < s2.name
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e7defd7..e551c11 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -88,6 +88,6 @@ private[spark] class SparkListenerBus() extends Logging {
        * add overhead in the general case. */
       Thread.sleep(10)
     }
-    return true
+    true
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 7cb3fe4..c60e989 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -96,7 +96,7 @@ private[spark] class Stage(
   def newAttemptId(): Int = {
     val id = nextAttemptId
     nextAttemptId += 1
-    return id
+    id
   }
 
   val name = callSite.getOrElse(rdd.origin)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index e80cc6b..9d3e615 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -74,6 +74,6 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
 
   def value(): T = {
     val resultSer = SparkEnv.get.serializer.newInstance()
-    return resultSer.deserialize(valueBytes)
+    resultSer.deserialize(valueBytes)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index c52d617..35e9544 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -37,7 +37,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
 
   protected val serializer = new ThreadLocal[SerializerInstance] {
     override def initialValue(): SerializerInstance = {
-      return sparkEnv.closureSerializer.newInstance()
+      sparkEnv.closureSerializer.newInstance()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a10e539..fc0ee07 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -228,7 +228,7 @@ private[spark] class TaskSetManager(
         return Some(index)
       }
     }
-    return None
+    None
   }
 
   /** Check whether a task is currently running an attempt on a given host */
@@ -291,7 +291,7 @@ private[spark] class TaskSetManager(
       }
     }
 
-    return None
+    None
   }
 
   /**
@@ -332,7 +332,7 @@ private[spark] class TaskSetManager(
     }
 
     // Finally, if all else has failed, find a speculative task
-    return findSpeculativeTask(execId, host, locality)
+    findSpeculativeTask(execId, host, locality)
   }
 
   /**
@@ -387,7 +387,7 @@ private[spark] class TaskSetManager(
         case _ =>
       }
     }
-    return None
+    None
   }
 
   /**
@@ -584,7 +584,7 @@ private[spark] class TaskSetManager(
   }
 
   override def getSchedulableByName(name: String): Schedulable = {
-    return null
+    null
   }
 
   override def addSchedulable(schedulable: Schedulable) {}
@@ -594,7 +594,7 @@ private[spark] class TaskSetManager(
   override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
     var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
     sortedTaskSetQueue += this
-    return sortedTaskSetQueue
+    sortedTaskSetQueue
   }
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
@@ -669,7 +669,7 @@ private[spark] class TaskSetManager(
         }
       }
     }
-    return foundTasks
+    foundTasks
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index e16d60c..c27049b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -140,7 +140,7 @@ private[spark] class CoarseMesosSchedulerBackend(
           .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
     }
-    return command.build()
+    command.build()
   }
 
   override def offerRescinded(d: SchedulerDriver, o: OfferID) {}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index b428c82..4978148 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -141,13 +141,13 @@ private[spark] class MesosSchedulerBackend(
       // Serialize the map as an array of (String, String) pairs
       execArgs = Utils.serialize(props.toArray)
     }
-    return execArgs
+    execArgs
   }
 
   private def setClassLoader(): ClassLoader = {
     val oldClassLoader = Thread.currentThread.getContextClassLoader
     Thread.currentThread.setContextClassLoader(classLoader)
-    return oldClassLoader
+    oldClassLoader
   }
 
   private def restoreClassLoader(oldClassLoader: ClassLoader) {
@@ -255,7 +255,7 @@ private[spark] class MesosSchedulerBackend(
       .setType(Value.Type.SCALAR)
       .setScalar(Value.Scalar.newBuilder().setValue(1).build())
       .build()
-    return MesosTaskInfo.newBuilder()
+    MesosTaskInfo.newBuilder()
       .setTaskId(taskId)
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
       .setExecutor(createExecutorInfo(slaveId))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c56e2ca..a716b1d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -412,7 +412,7 @@ private[spark] class BlockManager(
       logDebug("The value of block " + blockId + " is null")
     }
     logDebug("Block " + blockId + " not found")
-    return None
+    None
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 21f0036..a36abe0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -42,7 +42,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
           val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
           logDebug("Parsed as a block message array")
           val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
-          return Some(new BlockMessageArray(responseMessages).toBufferMessage)
+          Some(new BlockMessageArray(responseMessages).toBufferMessage)
         } catch {
           case e: Exception => logError("Exception handling buffer message", e)
           return None
@@ -50,7 +50,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
       }
       case otherMessage: Any => {
         logError("Unknown type message received: " + otherMessage)
-        return None
+        None
       }
     }
   }
@@ -61,7 +61,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
         val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
         logDebug("Received [" + pB + "]")
         putBlock(pB.id, pB.data, pB.level)
-        return None
+        None
       }
       case BlockMessage.TYPE_GET_BLOCK => {
         val gB = new GetBlock(blockMessage.getId)
@@ -70,9 +70,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
         if (buffer == null) {
           return None
         }
-        return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
+        Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
       }
-      case _ => return None
+      case _ => None
     }
   }
 
@@ -93,7 +93,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
     }
     logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
         + " and got buffer " + buffer)
-    return buffer
+    buffer
   }
 }
 
@@ -132,6 +132,6 @@ private[spark] object BlockManagerWorker extends Logging {
       }
       case None => logDebug("No response message received"); return null
     }
-    return null
+    null
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
index 80dcb5a..fbafcf7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
@@ -154,7 +154,7 @@ private[spark] class BlockMessage() {
     println()
     */
     val finishTime = System.currentTimeMillis
-    return Message.createBufferMessage(buffers)
+    Message.createBufferMessage(buffers)
   }
 
   override def toString: String = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index a06f50a..5932936 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -96,7 +96,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
     println()
     println()
     */
-    return Message.createBufferMessage(buffers)
+    Message.createBufferMessage(buffers)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 05f676c..27f057b 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -245,7 +245,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         return false
       }
     }
-    return true
+    true
   }
 
   override def contains(blockId: BlockId): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index b5596df..0f84810 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -74,7 +74,7 @@ class StorageLevel private(
     if (deserialized_) {
       ret |= 1
     }
-    return ret
+    ret
   }
 
   override def writeExternal(out: ObjectOutput) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index 8bb4ee3..edfa58b 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -67,7 +67,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
         i += 1
       }
     }
-    return null.asInstanceOf[V]
+    null.asInstanceOf[V]
   }
 
   /** Set the value for a key */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 7108595..1df6b87 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -61,7 +61,7 @@ private[spark] object ClosureCleaner extends Logging {
         return f.getType :: Nil // Stop at the first $outer that is not a closure
       }
     }
-    return Nil
+    Nil
   }
   
   // Get a list of the outer objects for a given closure object.
@@ -74,7 +74,7 @@ private[spark] object ClosureCleaner extends Logging {
         return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
       }
     }
-    return Nil
+    Nil
   }
   
   private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
@@ -174,7 +174,7 @@ private[spark] object ClosureCleaner extends Logging {
         field.setAccessible(true)
         field.set(obj, outer)
       }
-      return obj
+      obj
     }
   }
 }
@@ -182,7 +182,7 @@ private[spark] object ClosureCleaner extends Logging {
 private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
-    return new MethodVisitor(ASM4) {
+    new MethodVisitor(ASM4) {
       override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
         if (op == GETFIELD) {
           for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
@@ -215,7 +215,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
   
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
-    return new MethodVisitor(ASM4) {
+    new MethodVisitor(ASM4) {
       override def visitMethodInsn(op: Int, owner: String, name: String,
           desc: String) {
         val argTypes = Type.getArgumentTypes(desc)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index bddb3bb..3cf9489 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -108,7 +108,7 @@ private[spark] object SizeEstimator extends Logging {
       val bean = ManagementFactory.newPlatformMXBeanProxy(server,
         hotSpotMBeanName, hotSpotMBeanClass)
       // TODO: We could use reflection on the VMOption returned ?
-      return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
+      getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
     } catch {
       case e: Exception => {
         // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
@@ -141,7 +141,7 @@ private[spark] object SizeEstimator extends Logging {
     def dequeue(): AnyRef = {
       val elem = stack.last
       stack.trimEnd(1)
-      return elem
+      elem
     }
   }
 
@@ -162,7 +162,7 @@ private[spark] object SizeEstimator extends Logging {
     while (!state.isFinished) {
       visitSingleObject(state.dequeue(), state)
     }
-    return state.size
+    state.size
   }
 
   private def visitSingleObject(obj: AnyRef, state: SearchState) {
@@ -276,11 +276,11 @@ private[spark] object SizeEstimator extends Logging {
     // Create and cache a new ClassInfo
     val newInfo = new ClassInfo(shellSize, pointerFields)
     classInfos.put(cls, newInfo)
-    return newInfo
+    newInfo
   }
 
   private def alignSize(size: Long): Long = {
     val rem = size % ALIGN_SIZE
-    return if (rem == 0) size else (size + ALIGN_SIZE - rem)
+    if (rem == 0) size else (size + ALIGN_SIZE - rem)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f12531..f80ed29 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -49,14 +49,14 @@ private[spark] object Utils extends Logging {
     val oos = new ObjectOutputStream(bos)
     oos.writeObject(o)
     oos.close()
-    return bos.toByteArray
+    bos.toByteArray
   }
 
   /** Deserialize an object using Java serialization */
   def deserialize[T](bytes: Array[Byte]): T = {
     val bis = new ByteArrayInputStream(bytes)
     val ois = new ObjectInputStream(bis)
-    return ois.readObject.asInstanceOf[T]
+    ois.readObject.asInstanceOf[T]
   }
 
   /** Deserialize an object using Java serialization and the given ClassLoader */
@@ -66,7 +66,7 @@ private[spark] object Utils extends Logging {
       override def resolveClass(desc: ObjectStreamClass) =
         Class.forName(desc.getName, false, loader)
     }
-    return ois.readObject.asInstanceOf[T]
+    ois.readObject.asInstanceOf[T]
   }
 
   /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
@@ -144,7 +144,7 @@ private[spark] object Utils extends Logging {
         i += 1
       }
     }
-    return buf
+    buf
   }
 
   private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
@@ -428,7 +428,7 @@ private[spark] object Utils extends Logging {
   def parseHostPort(hostPort: String): (String,  Int) = {
     {
       // Check cache first.
-      var cached = hostPortParseResults.get(hostPort)
+      val cached = hostPortParseResults.get(hostPort)
       if (cached != null) return cached
     }
 
@@ -731,7 +731,7 @@ private[spark] object Utils extends Logging {
     } catch {
       case ise: IllegalStateException => return true
     }
-    return false
+    false
   }
 
   def isSpace(c: Char): Boolean = {
@@ -748,7 +748,7 @@ private[spark] object Utils extends Logging {
     var inWord = false
     var inSingleQuote = false
     var inDoubleQuote = false
-    var curWord = new StringBuilder
+    val curWord = new StringBuilder
     def endWord() {
       buf += curWord.toString
       curWord.clear()
@@ -794,7 +794,7 @@ private[spark] object Utils extends Logging {
     if (inWord || inDoubleQuote || inSingleQuote) {
       endWord()
     }
-    return buf
+    buf
   }
 
  /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
@@ -822,8 +822,7 @@ private[spark] object Utils extends Logging {
 
   /** Returns a copy of the system properties that is thread-safe to iterator over. */
   def getSystemProperties(): Map[String, String] = {
-    return System.getProperties().clone()
-      .asInstanceOf[java.util.Properties].toMap[String, String]
+    System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index fe710c5..094edcd 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -25,7 +25,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
   def + (other: Vector): Vector = {
     if (length != other.length)
       throw new IllegalArgumentException("Vectors of different length")
-    return Vector(length, i => this(i) + other(i))
+    Vector(length, i => this(i) + other(i))
   }
 
   def add(other: Vector) = this + other
@@ -33,7 +33,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
   def - (other: Vector): Vector = {
     if (length != other.length)
       throw new IllegalArgumentException("Vectors of different length")
-    return Vector(length, i => this(i) - other(i))
+    Vector(length, i => this(i) - other(i))
   }
 
   def subtract(other: Vector) = this - other
@@ -47,7 +47,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
       ans += this(i) * other(i)
       i += 1
     }
-    return ans
+    ans
   }
 
   /**
@@ -67,7 +67,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
       ans += (this(i) + plus(i)) * other(i)
       i += 1
     }
-    return ans
+    ans
   }
 
   def += (other: Vector): Vector = {
@@ -102,7 +102,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
       ans += (this(i) - other(i)) * (this(i) - other(i))
       i += 1
     }
-    return ans
+    ans
   }
 
   def dist(other: Vector): Double = math.sqrt(squaredDist(other))
@@ -117,7 +117,7 @@ object Vector {
 
   def apply(length: Int, initializer: Int => Double): Vector = {
     val elements: Array[Double] = Array.tabulate(length)(initializer)
-    return new Vector(elements)
+    new Vector(elements)
   }
 
   def zeros(length: Int) = new Vector(new Array[Double](length))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 7bf2020..235d317 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -64,7 +64,7 @@ class FakeTaskSetManager(
   }
 
   override def getSchedulableByName(name: String): Schedulable = {
-    return null
+    null
   }
 
   override def executorLost(executorId: String, host: String): Unit = {
@@ -79,13 +79,14 @@ class FakeTaskSetManager(
   {
     if (tasksSuccessful + runningTasks < numTasks) {
       increaseRunningTasks(1)
-      return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+      Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+    } else {
+      None
     }
-    return None
   }
 
   override def checkSpeculatableTasks(): Boolean = {
-    return true
+    true
   }
 
   def taskFinished() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2aa259d..14f89d5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
         locations: Seq[Seq[String]] = Nil
       ): MyRDD = {
     val maxPartition = numPartitions - 1
-    return new MyRDD(sc, dependencies) {
+    new MyRDD(sc, dependencies) {
       override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
         throw new RuntimeException("should not be reached")
       override def getPartitions = (0 to maxPartition).map(i => new Partition {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 5cc48ee..3880e68 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -47,7 +47,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
         dependencies: List[Dependency[_]]
       ): MyRDD = {
       val maxPartition = numPartitions - 1
-      return new MyRDD(sc, dependencies) {
+      new MyRDD(sc, dependencies) {
         override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
           throw new RuntimeException("should not be reached")
         override def getPartitions = (0 to maxPartition).map(i => new Partition {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 0ed366f..de4871d 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -61,8 +61,8 @@ class NonSerializable {}
 object TestObject {
   def run(): Int = {
     var nonSer = new NonSerializable
-    var x = 5
-    return withSpark(new SparkContext("local", "test")) { sc =>
+    val x = 5
+    withSpark(new SparkContext("local", "test")) { sc =>
       val nums = sc.parallelize(Array(1, 2, 3, 4))
       nums.map(_ + x).reduce(_ + _)
     }
@@ -76,7 +76,7 @@ class TestClass extends Serializable {
 
   def run(): Int = {
     var nonSer = new NonSerializable
-    return withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       val nums = sc.parallelize(Array(1, 2, 3, 4))
       nums.map(_ + getX).reduce(_ + _)
     }
@@ -88,7 +88,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
 
   def run(): Int = {
     var nonSer = new NonSerializable
-    return withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       val nums = sc.parallelize(Array(1, 2, 3, 4))
       nums.map(_ + getX).reduce(_ + _)
     }
@@ -103,7 +103,7 @@ class TestClassWithoutFieldAccess {
   def run(): Int = {
     var nonSer2 = new NonSerializable
     var x = 5
-    return withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       val nums = sc.parallelize(Array(1, 2, 3, 4))
       nums.map(_ + x).reduce(_ + _)
     }
@@ -115,7 +115,7 @@ object TestObjectWithNesting {
   def run(): Int = {
     var nonSer = new NonSerializable
     var answer = 0
-    return withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       val nums = sc.parallelize(Array(1, 2, 3, 4))
       var y = 1
       for (i <- 1 to 4) {
@@ -134,7 +134,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
   def run(): Int = {
     var nonSer = new NonSerializable
     var answer = 0
-    return withSpark(new SparkContext("local", "test")) { sc =>
+    withSpark(new SparkContext("local", "test")) { sc =>
       val nums = sc.parallelize(Array(1, 2, 3, 4))
       for (i <- 1 to 4) {
         var nonSer2 = new NonSerializable

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index 83db8b9..c8ecbb8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -43,7 +43,7 @@ object LocalALS {
   def generateR(): DoubleMatrix2D = {
     val mh = factory2D.random(M, F)
     val uh = factory2D.random(U, F)
-    return algebra.mult(mh, algebra.transpose(uh))
+    algebra.mult(mh, algebra.transpose(uh))
   }
 
   def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
@@ -56,7 +56,7 @@ object LocalALS {
     //println("R: " + r)
     blas.daxpy(-1, targetR, r)
     val sumSqs = r.aggregate(Functions.plus, Functions.square)
-    return sqrt(sumSqs / (M * U))
+    sqrt(sumSqs / (M * U))
   }
 
   def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
@@ -80,7 +80,7 @@ object LocalALS {
     val ch = new CholeskyDecomposition(XtX)
     val Xty2D = factory2D.make(Xty.toArray, F)
     val solved2D = ch.solve(Xty2D)
-    return solved2D.viewColumn(0)
+    solved2D.viewColumn(0)
   }
 
   def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
@@ -104,7 +104,7 @@ object LocalALS {
     val ch = new CholeskyDecomposition(XtX)
     val Xty2D = factory2D.make(Xty.toArray, F)
     val solved2D = ch.solve(Xty2D)
-    return solved2D.viewColumn(0)
+    solved2D.viewColumn(0)
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index fb130ea..9ab5f5a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -28,7 +28,7 @@ object LocalFileLR {
 
   def parsePoint(line: String): DataPoint = {
     val nums = line.split(' ').map(_.toDouble)
-    return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+    DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index f90ea35..a730464 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -55,7 +55,7 @@ object LocalKMeans {
       }
     }
 
-    return bestIndex
+    bestIndex
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index 30c86d8..17bafc2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -44,7 +44,7 @@ object SparkALS {
   def generateR(): DoubleMatrix2D = {
     val mh = factory2D.random(M, F)
     val uh = factory2D.random(U, F)
-    return algebra.mult(mh, algebra.transpose(uh))
+    algebra.mult(mh, algebra.transpose(uh))
   }
 
   def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
@@ -57,7 +57,7 @@ object SparkALS {
     //println("R: " + r)
     blas.daxpy(-1, targetR, r)
     val sumSqs = r.aggregate(Functions.plus, Functions.square)
-    return sqrt(sumSqs / (M * U))
+    sqrt(sumSqs / (M * U))
   }
 
   def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
@@ -83,7 +83,7 @@ object SparkALS {
     val ch = new CholeskyDecomposition(XtX)
     val Xty2D = factory2D.make(Xty.toArray, F)
     val solved2D = ch.solve(Xty2D)
-    return solved2D.viewColumn(0)
+    solved2D.viewColumn(0)
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index ff72532..3981906 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -43,7 +43,7 @@ object SparkHdfsLR {
     while (i < D) {
       x(i) = tok.nextToken.toDouble; i += 1
     }
-    return DataPoint(new Vector(x), y)
+    DataPoint(new Vector(x), y)
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index 8c99025..9fe2465 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -30,7 +30,7 @@ object SparkKMeans {
   val rand = new Random(42)
     
   def parseVector(line: String): Vector = {
-      return new Vector(line.split(' ').map(_.toDouble))
+    new Vector(line.split(' ').map(_.toDouble))
   }
   
   def closestPoint(p: Vector, centers: Array[Vector]): Int = {
@@ -46,7 +46,7 @@ object SparkKMeans {
       }
     }
   
-    return bestIndex
+    bestIndex
   }
 
   def main(args: Array[String]) {
@@ -61,15 +61,15 @@ object SparkKMeans {
     val K = args(2).toInt
     val convergeDist = args(3).toDouble
   
-    var kPoints = data.takeSample(false, K, 42).toArray
+    val kPoints = data.takeSample(withReplacement = false, K, 42).toArray
     var tempDist = 1.0
 
     while(tempDist > convergeDist) {
-      var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+      val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
       
-      var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
+      val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
       
-      var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
+      val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
       
       tempDist = 0.0
       for (i <- 0 until K) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index 4fe57de..a260098 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -65,7 +65,7 @@ object PageViewGenerator {
         return item
       }
     }
-    return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+    inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
   }
 
   def getNextClickEvent() : String = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 2d86233..c972a71 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -48,7 +48,7 @@ class PythonMLLibAPI extends Serializable {
     val db = bb.asDoubleBuffer()
     val ans = new Array[Double](length.toInt)
     db.get(ans)
-    return ans
+    ans
   }
 
   private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
@@ -60,7 +60,7 @@ class PythonMLLibAPI extends Serializable {
     bb.putLong(len)
     val db = bb.asDoubleBuffer()
     db.put(doubles)
-    return bytes
+    bytes
   }
 
   private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
@@ -86,7 +86,7 @@ class PythonMLLibAPI extends Serializable {
       ans(i) = new Array[Double](cols.toInt)
       db.get(ans(i))
     }
-    return ans
+    ans
   }
 
   private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
@@ -102,11 +102,10 @@ class PythonMLLibAPI extends Serializable {
     bb.putLong(rows)
     bb.putLong(cols)
     val db = bb.asDoubleBuffer()
-    var i = 0
     for (i <- 0 until rows) {
       db.put(doubles(i))
     }
-    return bytes
+    bytes
   }
 
   private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
@@ -121,7 +120,7 @@ class PythonMLLibAPI extends Serializable {
     val ret = new java.util.LinkedList[java.lang.Object]()
     ret.add(serializeDoubleVector(model.weights))
     ret.add(model.intercept: java.lang.Double)
-    return ret
+    ret
   }
 
   /**
@@ -130,7 +129,7 @@ class PythonMLLibAPI extends Serializable {
   def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
       numIterations: Int, stepSize: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
+    trainRegressionModel((data, initialWeights) =>
         LinearRegressionWithSGD.train(data, numIterations, stepSize,
                                       miniBatchFraction, initialWeights),
         dataBytesJRDD, initialWeightsBA)
@@ -142,7 +141,7 @@ class PythonMLLibAPI extends Serializable {
   def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
+    trainRegressionModel((data, initialWeights) =>
         LassoWithSGD.train(data, numIterations, stepSize, regParam,
                            miniBatchFraction, initialWeights),
         dataBytesJRDD, initialWeightsBA)
@@ -154,7 +153,7 @@ class PythonMLLibAPI extends Serializable {
   def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
+    trainRegressionModel((data, initialWeights) =>
         RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
                                      miniBatchFraction, initialWeights),
         dataBytesJRDD, initialWeightsBA)
@@ -166,7 +165,7 @@ class PythonMLLibAPI extends Serializable {
   def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
+    trainRegressionModel((data, initialWeights) =>
         SVMWithSGD.train(data, numIterations, stepSize, regParam,
                                      miniBatchFraction, initialWeights),
         dataBytesJRDD, initialWeightsBA)
@@ -178,7 +177,7 @@ class PythonMLLibAPI extends Serializable {
   def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
       numIterations: Int, stepSize: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
+    trainRegressionModel((data, initialWeights) =>
         LogisticRegressionWithSGD.train(data, numIterations, stepSize,
                                      miniBatchFraction, initialWeights),
         dataBytesJRDD, initialWeightsBA)
@@ -194,7 +193,7 @@ class PythonMLLibAPI extends Serializable {
     val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
     val ret = new java.util.LinkedList[java.lang.Object]()
     ret.add(serializeDoubleMatrix(model.clusterCenters))
-    return ret
+    ret
   }
 
   /** Unpack a Rating object from an array of bytes */
@@ -204,7 +203,7 @@ class PythonMLLibAPI extends Serializable {
     val user = bb.getInt()
     val product = bb.getInt()
     val rating = bb.getDouble()
-    return new Rating(user, product, rating)
+    new Rating(user, product, rating)
   }
 
   /** Unpack a tuple of Ints from an array of bytes */
@@ -245,7 +244,7 @@ class PythonMLLibAPI extends Serializable {
   def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
       iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
     val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
-    return ALS.train(ratings, rank, iterations, lambda, blocks)
+    ALS.train(ratings, rank, iterations, lambda, blocks)
   }
 
   /**
@@ -257,6 +256,6 @@ class PythonMLLibAPI extends Serializable {
   def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
       iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
     val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
-    return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+    ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index ca0115f..ebfb8db 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -203,6 +203,6 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
     } catch {
       case e: Exception =>
     }
-    return super.resolveClass(desc)
+    super.resolveClass(desc)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fb9eda8..a7ba233 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -219,7 +219,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           reset()
           return false
       }
-      return true
+      true
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index e0ff3cc..b34ba7b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -65,7 +65,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
             val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
             val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
             //logDebug("Generating state RDD for time " + validTime)
-            return Some(stateRDD)
+            Some(stateRDD)
           }
           case None => {    // If parent RDD does not exist
 
@@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
               updateFuncLocal(i)
             }
             val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
-            return Some(stateRDD)
+            Some(stateRDD)
           }
         }
       }
@@ -98,11 +98,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
             val groupedRDD = parentRDD.groupByKey(partitioner)
             val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
             //logDebug("Generating state RDD for time " + validTime + " (first)")
-            return Some(sessionRDD)
+            Some(sessionRDD)
           }
           case None => { // If parent RDD does not exist, then nothing to do!
             //logDebug("Not generating state RDD (no previous state, no parent)")
-            return None
+            None
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 110a20f..73dc520 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -76,6 +76,6 @@ private[spark] class StreamingListenerBus() extends Logging {
        * add overhead in the general case. */
       Thread.sleep(10)
     }
-    return true
+    true
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index f67bb2f..c3a849d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -66,7 +66,7 @@ class SystemClock() extends Clock {
         }
       Thread.sleep(sleepTime)
     }
-    return -1
+    -1
   }
 }
 
@@ -96,6 +96,6 @@ class ManualClock() extends Clock {
         this.wait(100)
       }      
     }
-    return currentTime()
+    currentTime()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 4e6ce6e..5b6c048 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -90,7 +90,7 @@ object RawTextHelper {
         }
       }
     }
-    return taken.toIterator  
+    taken.toIterator
   }
  
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 23781ea..e1fe09e 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -158,7 +158,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
     appContext.setApplicationId(appId)
     appContext.setApplicationName(args.appName)
-    return appContext
+    appContext
   }
 
   /** See if two file systems are the same or not. */
@@ -191,9 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
     //check for ports
     if (srcUri.getPort() != dstUri.getPort()) {
-      return false
+      false
+    } else {
+      true
     }
-    return true
   }
 
   /** Copy the file into HDFS if needed. */
@@ -299,7 +300,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
     }
 
     UserGroupInformation.getCurrentUser().addCredentials(credentials)
-    return localResources
+    localResources
   }
 
   def setupLaunchEnv(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index ddfec1a..0138d7a 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -125,7 +125,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     val containerId = ConverterUtils.toContainerId(containerIdString)
     val appAttemptId = containerId.getApplicationAttemptId()
     logInfo("ApplicationAttemptId: " + appAttemptId)
-    return appAttemptId
+    appAttemptId
   }
 
   private def registerWithResourceManager(): AMRMProtocol = {
@@ -133,7 +133,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
       YarnConfiguration.RM_SCHEDULER_ADDRESS,
       YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
     logInfo("Connecting to ResourceManager at " + rmAddress)
-    return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+    rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
   }
 
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
@@ -147,7 +147,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     appMasterRequest.setRpcPort(0)
     // What do we provide here ? Might make sense to expose something sensible later ?
     appMasterRequest.setTrackingUrl("")
-    return resourceManager.registerApplicationMaster(appMasterRequest)
+    resourceManager.registerApplicationMaster(appMasterRequest)
   }
 
   private def waitForSparkMaster() {
@@ -220,7 +220,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
     t.setDaemon(true)
     t.start()
     logInfo("Started progress reporter thread - sleep time : " + sleepTime)
-    return t
+    t
   }
 
   private def sendProgress() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 132630e..d32cdcc 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -195,7 +195,7 @@ class WorkerRunnable(
     }
 
     logInfo("Prepared Local resources " + localResources)
-    return localResources
+    localResources
   }
 
   def prepareEnvironment: HashMap[String, String] = {
@@ -207,7 +207,7 @@ class WorkerRunnable(
     Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
 
     System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
-    return env
+    env
   }
 
   def connectToCM: ContainerManager = {
@@ -226,8 +226,7 @@ class WorkerRunnable(
     val proxy = user
         .doAs(new PrivilegedExceptionAction[ContainerManager] {
           def run: ContainerManager = {
-            return rpc.getProxy(classOf[ContainerManager],
-                cmAddress, conf).asInstanceOf[ContainerManager]
+            rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
           }
         })
     proxy

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 5f159b0..535abbf 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -143,7 +143,7 @@ class ClientDistributedCacheManager() extends Logging {
     if (isPublic(conf, uri, statCache)) {
       return LocalResourceVisibility.PUBLIC 
     } 
-    return LocalResourceVisibility.PRIVATE
+    LocalResourceVisibility.PRIVATE
   }
 
   /**
@@ -161,7 +161,7 @@ class ClientDistributedCacheManager() extends Logging {
     if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
       return false
     }
-    return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+    ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
   }
 
   /**
@@ -183,7 +183,7 @@ class ClientDistributedCacheManager() extends Logging {
       }
       current = current.getParent()
     }
-    return true
+    true
   }
 
   /**
@@ -203,7 +203,7 @@ class ClientDistributedCacheManager() extends Logging {
     if (otherAction.implies(action)) {
       return true
     }
-    return false
+    false
   }
 
   /**
@@ -223,6 +223,6 @@ class ClientDistributedCacheManager() extends Logging {
         statCache.put(uri, newStat)
         newStat
     }
-    return stat
+    stat
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 2941356..458df4f 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -42,7 +42,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
   class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
     override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): 
         LocalResourceVisibility = {
-      return LocalResourceVisibility.PRIVATE
+      LocalResourceVisibility.PRIVATE
     }
   }