You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/09 19:09:31 UTC

[3/3] git commit: Merge pull request #557 from ScrapCodes/style. Closes #557.

Merge pull request #557 from ScrapCodes/style. Closes #557.

SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build.

Author: Patrick Wendell <pw...@gmail.com>
Author: Prashant Sharma <sc...@gmail.com>

== Merge branch commits ==

commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4
Author: Prashant Sharma <sc...@gmail.com>
Date:   Sun Feb 9 17:39:07 2014 +0530

    scala style fixes

commit f91709887a8e0b608c5c2b282db19b8a44d53a43
Author: Patrick Wendell <pw...@gmail.com>
Date:   Fri Jan 24 11:22:53 2014 -0800

    Adding scalastyle snapshot


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b69f8b2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b69f8b2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b69f8b2a

Branch: refs/heads/master
Commit: b69f8b2a01669851c656739b6886efe4cddef31a
Parents: b6dba10
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Feb 9 10:09:19 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Feb 9 10:09:19 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/bagel/Bagel.scala    |  55 ++++----
 .../scala/org/apache/spark/CacheManager.scala   |   4 +-
 .../org/apache/spark/FetchFailedException.scala |   3 +-
 .../org/apache/spark/MapOutputTracker.scala     |   2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  26 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |   7 +-
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   4 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |  40 +++---
 .../org/apache/spark/api/java/JavaRDDLike.scala |  18 +--
 .../spark/api/java/JavaSparkContext.scala       |   8 +-
 .../spark/api/python/PythonPartitioner.scala    |   3 +-
 .../org/apache/spark/api/python/PythonRDD.scala |  13 +-
 .../spark/broadcast/TorrentBroadcast.scala      |   3 +-
 .../apache/spark/deploy/ClientArguments.scala   |  20 +--
 .../spark/deploy/FaultToleranceTest.scala       |  29 +++--
 .../apache/spark/deploy/LocalSparkCluster.scala |   3 +-
 .../apache/spark/deploy/client/AppClient.scala  |   3 +-
 .../spark/deploy/client/AppClientListener.scala |   3 +-
 .../org/apache/spark/deploy/master/Master.scala |  18 +--
 .../spark/deploy/master/ui/IndexPage.scala      |  36 +++---
 .../spark/deploy/worker/CommandUtils.scala      |   3 +-
 .../spark/deploy/worker/DriverWrapper.scala     |   2 +-
 .../spark/deploy/worker/WorkerWatcher.scala     |   2 +-
 .../spark/deploy/worker/ui/IndexPage.scala      |  12 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   2 +-
 .../org/apache/spark/executor/Executor.scala    |   5 +-
 .../spark/executor/ExecutorExitCode.scala       |   5 +-
 .../apache/spark/executor/ExecutorSource.scala  |   3 +-
 .../org/apache/spark/executor/TaskMetrics.scala |   3 +-
 .../apache/spark/metrics/MetricsSystem.scala    |   3 +-
 .../org/apache/spark/network/Connection.scala   |  10 +-
 .../spark/network/ConnectionManager.scala       |  68 ++++++----
 .../spark/network/ConnectionManagerTest.scala   |  23 ++--
 .../org/apache/spark/network/SenderTest.scala   |  19 +--
 .../org/apache/spark/rdd/CheckpointRDD.scala    |   3 +-
 .../org/apache/spark/rdd/CoalescedRDD.scala     |   9 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |   4 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   2 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |   3 +-
 .../spark/rdd/PartitionerAwareUnionRDD.scala    |   4 +-
 .../apache/spark/rdd/RDDCheckpointData.scala    |  10 +-
 .../spark/rdd/SequenceFileRDDFunctions.scala    |   3 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   3 +-
 .../spark/scheduler/InputFormatInfo.scala       |  32 +++--
 .../org/apache/spark/scheduler/JobLogger.scala  |  10 +-
 .../org/apache/spark/scheduler/JobResult.scala  |   3 +-
 .../org/apache/spark/scheduler/ResultTask.scala |   6 +-
 .../spark/scheduler/SchedulableBuilder.scala    |   2 +-
 .../apache/spark/scheduler/SparkListener.scala  |  30 +++--
 .../org/apache/spark/scheduler/Stage.scala      |   3 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |   3 +-
 .../org/apache/spark/scheduler/TaskInfo.scala   |  11 +-
 .../org/apache/spark/scheduler/TaskResult.scala |   3 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   3 +-
 .../apache/spark/scheduler/TaskSetManager.scala |   2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   3 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   7 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   3 +-
 .../spark/serializer/KryoSerializer.scala       |   3 +-
 .../apache/spark/serializer/Serializer.scala    |   7 +-
 .../spark/storage/BlockFetcherIterator.scala    |   6 +-
 .../org/apache/spark/storage/BlockManager.scala |  16 ++-
 .../spark/storage/BlockManagerMasterActor.scala |   8 +-
 .../spark/storage/BlockManagerSource.scala      |   6 +-
 .../spark/storage/BlockMessageArray.scala       |   6 +-
 .../org/apache/spark/storage/StorageUtils.scala |  19 +--
 .../scala/org/apache/spark/ui/UIUtils.scala     |  15 ++-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |   3 +-
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  |   6 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    |   3 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |   5 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  18 +--
 .../org/apache/spark/ui/jobs/StageTable.scala   |   3 +-
 .../org/apache/spark/util/ClosureCleaner.scala  |  11 +-
 .../apache/spark/util/CompletionIterator.scala  |   5 +-
 .../org/apache/spark/util/Distribution.scala    |   8 +-
 .../org/apache/spark/util/MetadataCleaner.scala |   3 +-
 .../spark/util/SerializableHyperLogLog.scala    |   3 +-
 .../org/apache/spark/util/SizeEstimator.scala   |  20 +--
 .../org/apache/spark/util/StatCounter.scala     |  15 ++-
 .../scala/org/apache/spark/util/Utils.scala     |   6 +-
 .../scala/org/apache/spark/util/Vector.scala    |  21 ++--
 .../apache/spark/util/collection/BitSet.scala   |   2 +-
 .../util/collection/ExternalAppendOnlyMap.scala |   2 +-
 .../spark/util/collection/OpenHashSet.scala     |   2 +-
 .../examples/StatefulNetworkWordCount.scala     |  13 +-
 .../streaming/examples/TwitterAlgebirdHLL.scala |  19 +--
 .../clickstream/PageViewGenerator.scala         |   4 +-
 .../examples/clickstream/PageViewStream.scala   |   3 +-
 .../streaming/kafka/KafkaInputDStream.scala     |  14 ++-
 .../spark/streaming/kafka/KafkaUtils.scala      |   3 +-
 .../spark/streaming/zeromq/ZeroMQReceiver.scala |   4 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |  25 ++--
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |   7 +-
 .../scala/org/apache/spark/graphx/Graph.scala   |  19 +--
 .../apache/spark/graphx/PartitionStrategy.scala |  14 ++-
 .../org/apache/spark/graphx/VertexRDD.scala     |  23 ++--
 .../org/apache/spark/graphx/lib/Analytics.scala |   2 +-
 .../spark/graphx/util/GraphGenerators.scala     |  31 ++---
 .../spark/mllib/api/python/PythonMLLibAPI.scala |   3 +-
 .../org/apache/spark/mllib/linalg/SVD.scala     |   8 +-
 .../mllib/optimization/GradientDescent.scala    |   2 +-
 .../apache/spark/mllib/recommendation/ALS.scala |  20 +--
 project/SparkBuild.scala                        |   4 +-
 project/build.properties                        |   2 +-
 project/plugins.sbt                             |  11 +-
 project/project/SparkPluginBuild.scala          |   2 +-
 .../apache/spark/repl/ExecutorClassLoader.scala |  15 ++-
 .../org/apache/spark/repl/SparkExprTyper.scala  |   2 +
 .../org/apache/spark/repl/SparkILoop.scala      |   2 +
 .../org/apache/spark/repl/SparkILoopInit.scala  |   2 +
 .../org/apache/spark/repl/SparkIMain.scala      |   2 +
 .../org/apache/spark/repl/SparkImports.scala    |   2 +
 .../spark/repl/SparkJLineCompletion.scala       |   2 +
 .../apache/spark/repl/SparkJLineReader.scala    |   2 +
 .../apache/spark/repl/SparkMemberHandlers.scala |   2 +
 scalastyle-config.xml                           | 126 +++++++++++++++++++
 .../streaming/util/MasterFailureTest.scala      |  17 +--
 .../tools/JavaAPICompletenessChecker.scala      |   6 +-
 119 files changed, 795 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 44e26bb..2812166 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -28,21 +28,22 @@ object Bagel extends Logging {
   /**
    * Runs a Bagel program.
    * @param sc [[org.apache.spark.SparkContext]] to use for the program.
-   * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
-   *                 the vertex id.
-   * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
-   *                 empty array, i.e. sc.parallelize(Array[K, Message]()).
-   * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
-   *                message before sending (which often involves network I/O).
-   * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
-   *                  and provides the result to each vertex in the next superstep.
+   * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
+   *                 Key will be the vertex id.
+   * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
+   *                 this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
+   * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
+   *                 given vertex into one message before sending (which often involves network I/O).
+   * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
+   *                  after each superstep and provides the result to each vertex in the next
+   *                  superstep.
    * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
    * @param numPartitions number of partitions across which to split the graph.
    *                      Default is the default parallelism of the SparkContext
-   * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
-   *                    Defaults to caching in memory.
-   * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
-   *                optional Aggregator and the current superstep,
+   * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
+   *                    intermediate RDDs in each superstep. Defaults to caching in memory.
+   * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
+   *                the Vertex, optional Aggregator and the current superstep,
    *                and returns a set of (Vertex, outgoing Messages) pairs
    * @tparam K key
    * @tparam V vertex type
@@ -71,7 +72,7 @@ object Bagel extends Logging {
     var msgs = messages
     var noActivity = false
     do {
-      logInfo("Starting superstep "+superstep+".")
+      logInfo("Starting superstep " + superstep + ".")
       val startTime = System.currentTimeMillis
 
       val aggregated = agg(verts, aggregator)
@@ -97,7 +98,8 @@ object Bagel extends Logging {
     verts
   }
 
-  /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
+  /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
+    * storage level */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
     sc: SparkContext,
     vertices: RDD[(K, V)],
@@ -106,8 +108,8 @@ object Bagel extends Logging {
     partitioner: Partitioner,
     numPartitions: Int
   )(
-    compute: (V, Option[C], Int) => (V, Array[M])
-  ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+    compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
+        combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
 
   /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
@@ -127,8 +129,8 @@ object Bagel extends Logging {
   }
 
   /**
-   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
-   * and default storage level
+   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
+   * [[org.apache.spark.HashPartitioner]] and default storage level
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
     sc: SparkContext,
@@ -138,9 +140,13 @@ object Bagel extends Logging {
     numPartitions: Int
   )(
     compute: (V, Option[C], Int) => (V, Array[M])
-  ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+  ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
+      DEFAULT_STORAGE_LEVEL)(compute)
 
-  /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
+  /**
+   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
+   * default [[org.apache.spark.HashPartitioner]]
+   */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
     sc: SparkContext,
     vertices: RDD[(K, V)],
@@ -158,7 +164,8 @@ object Bagel extends Logging {
   }
 
   /**
-   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
+   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
+   * default [[org.apache.spark.HashPartitioner]],
    * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -171,7 +178,8 @@ object Bagel extends Logging {
   ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
 
   /**
-   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
+   * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
+   * the default [[org.apache.spark.HashPartitioner]]
    * and [[org.apache.spark.bagel.DefaultCombiner]]
    */
   def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -227,8 +235,9 @@ object Bagel extends Logging {
           })
 
         numMsgs += newMsgs.size
-        if (newVert.active)
+        if (newVert.active) {
           numActiveVerts += 1
+        }
 
         Some((newVert, newMsgs))
     }.persist(storageLevel)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 8e5dd8a..15a0d24 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -31,8 +31,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
   private val loading = new HashSet[RDDBlockId]()
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
-  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
-      : Iterator[T] = {
+  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
+    storageLevel: StorageLevel): Iterator[T] = {
     val key = RDDBlockId(rdd.id, split.index)
     logDebug("Looking for partition " + key)
     blockManager.get(key) match {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/FetchFailedException.scala
index d242047..8eaa26b 100644
--- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/FetchFailedException.scala
@@ -25,7 +25,8 @@ private[spark] class FetchFailedException(
     cause: Throwable)
   extends Exception {
 
-  def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, cause: Throwable) =
+  def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
+      cause: Throwable) =
     this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
       "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
       cause)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 30d182b..8d6db0f 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
 
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
 
 private[spark] sealed trait MapOutputTrackerMessage
 private[spark] case class GetMapOutputStatuses(shuffleId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 566472e..25f7a5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -63,9 +63,9 @@ import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, Metada
  */
 class SparkContext(
     config: SparkConf,
-    // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
-    // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
-    // a map from hostname to a list of input format splits on the host.
+    // This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
+    // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
+    // contains a map from hostname to a list of input format splits on the host.
     val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
   extends Logging {
 
@@ -552,10 +552,11 @@ class SparkContext(
 
   /**
    * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
-   * BytesWritable values that contain a serialized partition. This is still an experimental storage
-   * format and may not be supported exactly as is in future Spark releases. It will also be pretty
-   * slow if you use the default serializer (Java serialization), though the nice thing about it is
-   * that there's very little effort required to save arbitrary objects.
+   * BytesWritable values that contain a serialized partition. This is still an experimental
+   * storage format and may not be supported exactly as is in future Spark releases. It will also
+   * be pretty slow if you use the default serializer (Java serialization),
+   * though the nice thing about it is that there's very little effort required to save arbitrary
+   * objects.
    */
   def objectFile[T: ClassTag](
       path: String,
@@ -1043,7 +1044,7 @@ object SparkContext {
 
   implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
     def addInPlace(t1: Long, t2: Long) = t1 + t2
-    def zero(initialValue: Long) = 0l
+    def zero(initialValue: Long) = 0L
   }
 
   implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
@@ -1109,7 +1110,8 @@ object SparkContext {
 
   implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
 
-  implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
+  implicit def booleanWritableConverter() =
+    simpleWritableConverter[Boolean, BooleanWritable](_.get)
 
   implicit def bytesWritableConverter() = {
     simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
@@ -1258,7 +1260,8 @@ object SparkContext {
 
       case "yarn-client" =>
         val scheduler = try {
-          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+          val clazz =
+            Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
           val cons = clazz.getConstructor(classOf[SparkContext])
           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
 
@@ -1269,7 +1272,8 @@ object SparkContext {
         }
 
         val backend = try {
-          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
+          val clazz =
+            Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
         } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ed78856..6ae020f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -96,7 +96,7 @@ object SparkEnv extends Logging {
   @volatile private var lastSetSparkEnv : SparkEnv = _
 
   def set(e: SparkEnv) {
-	  lastSetSparkEnv = e
+    lastSetSparkEnv = e
     env.set(e)
   }
 
@@ -112,7 +112,7 @@ object SparkEnv extends Logging {
    * Returns the ThreadLocal SparkEnv.
    */
   def getThreadLocal: SparkEnv = {
-	  env.get()
+    env.get()
   }
 
   private[spark] def create(
@@ -168,7 +168,8 @@ object SparkEnv extends Logging {
     val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
       "BlockManagerMaster",
       new BlockManagerMasterActor(isLocal, conf)), conf)
-    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
+    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
+      serializer, conf)
 
     val connectionManager = blockManager.connectionManager
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index b0dedc6..33737e1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -148,8 +148,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
   def sum(): Double = srdd.sum()
 
   /**
-   * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
-   * of the RDD's elements in one operation.
+   * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
+   * count of the RDD's elements in one operation.
    */
   def stats(): StatCounter = srdd.stats()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index f430a33..5b1bf94 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -88,7 +88,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
   /**
    * Return a new RDD containing the distinct elements in this RDD.
    */
-  def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
+  def distinct(numPartitions: Int): JavaPairRDD[K, V] =
+      new JavaPairRDD[K, V](rdd.distinct(numPartitions))
 
   /**
    * Return a new RDD containing only the elements that satisfy a predicate.
@@ -210,25 +211,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
     rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
-  def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
-    fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
+  def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V])
+  : JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
     fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value"
+   * which may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
     fromRDD(rdd.foldByKey(zeroValue)(func))
@@ -375,7 +376,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
    * into `numPartitions` partitions.
    */
-  def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
+  def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+  : JavaPairRDD[K, (V, Optional[W])] = {
     val joinResult = rdd.leftOuterJoin(other, numPartitions)
     fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
   }
@@ -397,7 +399,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
    * RDD into the given number of partitions.
    */
-  def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
+  def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
+  : JavaPairRDD[K, (Optional[V], W)] = {
     val joinResult = rdd.rightOuterJoin(other, numPartitions)
     fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
   }
@@ -439,8 +442,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
-  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+  def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
+      partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
     fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
 
   /**
@@ -462,8 +465,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
-  = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
+  def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
+  : JavaPairRDD[K, (JList[V], JList[W])] =
+    fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
 
   /**
    * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 4db7339..fcb9729 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -76,7 +76,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
       f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
       preservesPartitioning: Boolean = false): JavaRDD[R] =
     new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
-					   preservesPartitioning))
+        preservesPartitioning))
 
   /**
    * Return a new RDD by applying a function to all elements of this RDD.
@@ -134,7 +134,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
+  def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
+      preservesPartitioning: Boolean): JavaRDD[U] = {
     def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
     JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
   }
@@ -160,16 +161,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+  def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+    preservesPartitioning: Boolean): JavaDoubleRDD = {
     def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
-    new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+    new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
+      .map((x: java.lang.Double) => x.doubleValue()))
   }
 
   /**
    * Return a new RDD by applying a function to each partition of this RDD.
    */
-  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
-  JavaPairRDD[K2, V2] = {
+  def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+      preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
     def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
     JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
   }
@@ -294,7 +297,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   }
 
   /**
-   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+   * Reduces the elements of this RDD using the specified commutative and associative binary
+   * operator.
    */
   def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 5a426b9..22dc9c9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -362,15 +362,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     doubleAccumulator(initialValue)
 
   /**
-   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values
-   * to using the `add` method. Only the master can access the accumulator's `value`.
+   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
+   * values to using the `add` method. Only the master can access the accumulator's `value`.
    */
   def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
     sc.accumulator(initialValue)(accumulatorParam)
 
   /**
-   * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks can
-   * "add" values with `add`. Only the master can access the accumuable's `value`.
+   * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
+   * can "add" values with `add`. Only the master can access the accumuable's `value`.
    */
   def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
     sc.accumulable(initialValue)(param)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
index 2be4e32..35eca62 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -23,7 +23,8 @@ import org.apache.spark.Partitioner
 import org.apache.spark.util.Utils
 
 /**
- * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API.
+ * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the
+ * Python API.
  *
  * Stores the unique id() of the Python-side partitioning function so that it is incorporated into
  * equality comparisons.  Correctness requires that the id is a unique identifier for the

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 9cbd26b..33667a9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -91,8 +91,9 @@ private[spark] class PythonRDD[T: ClassTag](
             // Kill the Python worker process:
             worker.shutdownOutput()
           case e: IOException =>
-            // This can happen for legitimate reasons if the Python code stops returning data before we are done
-            // passing elements through, e.g., for take(). Just log a message to say it happened.
+            // This can happen for legitimate reasons if the Python code stops returning data
+            // before we are done passing elements through, e.g., for take(). Just log a message
+            // to say it happened.
             logInfo("stdin writer to Python finished early")
             logDebug("stdin writer to Python finished early", e)
         }
@@ -132,7 +133,8 @@ private[spark] class PythonRDD[T: ClassTag](
               val init = initTime - bootTime
               val finish = finishTime - initTime
               val total = finishTime - startTime
-              logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
+              logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
+                init, finish))
               read
             case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
               // Signals that an exception has been thrown in python
@@ -184,7 +186,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
   override def compute(split: Partition, context: TaskContext) =
     prev.iterator(split, context).grouped(2).map {
       case Seq(a, b) => (Utils.deserializeLongValue(a), b)
-      case x          => throw new SparkException("PairwiseRDD: unexpected value: " + x)
+      case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
     }
   val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
 }
@@ -274,7 +276,8 @@ private[spark] object PythonRDD {
 
 }
 
-private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
+private
+class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
   override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index d351dfc..ec99725 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -187,8 +187,9 @@ extends Logging {
     val bais = new ByteArrayInputStream(byteArray)
 
     var blockNum = (byteArray.length / BLOCK_SIZE)
-    if (byteArray.length % BLOCK_SIZE != 0)
+    if (byteArray.length % BLOCK_SIZE != 0) {
       blockNum += 1
+    }
 
     var retVal = new Array[TorrentBlock](blockNum)
     var blockID = 0

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index db67c6d..3db970c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -101,16 +101,16 @@ private[spark] class ClientArguments(args: Array[String]) {
     // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
     //       separately similar to in the YARN client.
     val usage =
-      s"""
-        |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
-        |Usage: DriverClient kill <active-master> <driver-id>
-        |
-        |Options:
-        |   -c CORES, --cores CORES        Number of cores to request (default: $defaultCores)
-        |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $defaultMemory)
-        |   -s, --supervise                Whether to restart the driver on failure
-        |   -v, --verbose                  Print more debugging output
-      """.stripMargin
+     s"""
+      |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
+      |Usage: DriverClient kill <active-master> <driver-id>
+      |
+      |Options:
+      |   -c CORES, --cores CORES        Number of cores to request (default: $defaultCores)
+      |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $defaultMemory)
+      |   -s, --supervise                Whether to restart the driver on failure
+      |   -v, --verbose                  Print more debugging output
+     """.stripMargin
     System.err.println(usage)
     System.exit(exitCode)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 4dfb19e..7de7c48 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -1,20 +1,18 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.spark.deploy
@@ -306,7 +304,8 @@ private[spark] object FaultToleranceTest extends App with Logging {
     }
   }
 
-  logInfo("Ran %s tests, %s passed and %s failed".format(numPassed+numFailed, numPassed, numFailed))
+  logInfo("Ran %s tests, %s passed and %s failed".format(numPassed + numFailed, numPassed,
+    numFailed))
 }
 
 private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index ffc0cb0..488843a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -33,7 +33,8 @@ import scala.collection.mutable.ArrayBuffer
  * fault recovery without spinning up a lot of processes.
  */
 private[spark]
-class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
+class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int)
+  extends Logging {
 
   private val localHostname = Utils.localHostName()
   private val masterActorSystems = ArrayBuffer[ActorSystem]()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 1415e2f..8901806 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -132,7 +132,8 @@ private[spark] class AppClient(
 
       case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
         val fullId = appId + "/" + id
-        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
+        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
+          cores))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
       case ExecutorUpdated(id, state, message, exitStatus) =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index 55d4ef1..2f2cbd1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -33,7 +33,8 @@ private[spark] trait AppClientListener {
   /** Dead means that we couldn't find any Masters to connect to, and have given up. */
   def dead(): Unit
 
-  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
+  def executorAdded(
+      fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
 
   def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2ef167f..82bf655 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -149,10 +149,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   override def receive = {
     case ElectedLeader => {
       val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
-      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
+      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
         RecoveryState.ALIVE
-      else
+      } else {
         RecoveryState.RECOVERING
+      }
       logInfo("I have been elected leader! New state: " + state)
       if (state == RecoveryState.RECOVERING) {
         beginRecovery(storedApps, storedDrivers, storedWorkers)
@@ -165,7 +166,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       System.exit(0)
     }
 
-    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
+    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress)
+    => {
       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
         host, workerPort, cores, Utils.megabytesToString(memory)))
       if (state == RecoveryState.STANDBY) {
@@ -181,9 +183,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
           schedule()
         } else {
           val workerAddress = worker.actor.path.address
-          logWarning("Worker registration failed. Attempted to re-register worker at same address: " +
-            workerAddress)
-          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
+          logWarning("Worker registration failed. Attempted to re-register worker at same " +
+            "address: " + workerAddress)
+          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+            + workerAddress)
         }
       }
     }
@@ -641,8 +644,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
           worker.id, WORKER_TIMEOUT/1000))
         removeWorker(worker)
       } else {
-        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
           workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index a9af8df..64ecf22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -57,7 +57,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
     val completedApps = state.completedApps.sortBy(_.endTime).reverse
     val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
 
-    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class")
+    val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory",
+      "Main Class")
     val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse
     val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers)
     val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse
@@ -103,13 +104,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
         </div>
 
         <div>
-          {if (hasDrivers)
-          <div class="row-fluid">
-            <div class="span12">
-              <h4> Running Drivers </h4>
-              {activeDriversTable}
-            </div>
-          </div>
+          {if (hasDrivers) {
+             <div class="row-fluid">
+               <div class="span12">
+                 <h4> Running Drivers </h4>
+                 {activeDriversTable}
+               </div>
+             </div>
+           }
           }
         </div>
 
@@ -121,13 +123,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
         </div>
 
         <div>
-          {if (hasDrivers)
-          <div class="row-fluid">
-            <div class="span12">
-              <h4> Completed Drivers </h4>
-              {completedDriversTable}
-            </div>
-          </div>
+          {if (hasDrivers) {
+              <div class="row-fluid">
+                <div class="span12">
+                  <h4> Completed Drivers </h4>
+                  {completedDriversTable}
+                </div>
+              </div>
+            }
           }
         </div>;
 
@@ -175,7 +178,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
     <tr>
       <td>{driver.id} </td>
       <td>{driver.submitDate}</td>
-      <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}</td>
+      <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}
+      </td>
       <td>{driver.state}</td>
       <td sorttable_customkey={driver.desc.cores.toString}>
         {driver.desc.cores}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 460883e..f411eb9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -49,7 +49,8 @@ object CommandUtils extends Logging {
     val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
       .map(p => List("-Djava.library.path=" + p))
       .getOrElse(Nil)
-    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil)
+    val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
+            .map(Utils.splitCommandString).getOrElse(Nil)
     val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
     val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 6f6c101..a26e479 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -45,4 +45,4 @@ object DriverWrapper {
         System.exit(-1)
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 1dc39c4..530c147 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -69,4 +69,4 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
 
     case e => logWarning(s"Received unexpected actor system event: $e")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 925c6fb..3089acf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -84,7 +84,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
             {runningExecutorTable}
           </div>
         </div>
-
+        // scalastyle:off
         <div>
           {if (hasDrivers)
             <div class="row-fluid"> <!-- Running Drivers -->
@@ -113,7 +113,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
             </div>
           }
         </div>;
-
+    // scalastyle:on
     UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format(
       workerState.host, workerState.port))
   }
@@ -133,10 +133,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
         </ul>
       </td>
       <td>
-	 <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
-          .format(executor.appId, executor.execId)}>stdout</a>
-	 <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
-          .format(executor.appId, executor.execId)}>stderr</a>
+     <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
+        .format(executor.appId, executor.execId)}>stdout</a>
+     <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
+        .format(executor.appId, executor.execId)}>stderr</a>
       </td> 
     </tr>
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index c23b75d..86688e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -187,7 +187,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
 
     val logPageLength = math.min(byteLength, maxBytes)
 
-    val endByte = math.min(startByte+logPageLength, logLength)
+    val endByte = math.min(startByte + logPageLength, logLength)
 
     (startByte, endByte)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f7efd74..989d666 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -205,7 +205,7 @@ private[spark] class Executor(
         }
 
         attemptedTask = Some(task)
-        logDebug("Task " + taskId +"'s epoch is " + task.epoch)
+        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
         env.mapOutputTracker.updateEpoch(task.epoch)
 
         // Run the actual task and measure its runtime.
@@ -233,7 +233,8 @@ private[spark] class Executor(
 
         val accumUpdates = Accumulators.values
 
-        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
+        val directResult = new DirectTaskResult(valueBytes, accumUpdates,
+          task.metrics.getOrElse(null))
         val serializedDirectResult = ser.serialize(directResult)
         logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
         val serializedResult = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index e5c9bbb..210f3db 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -50,10 +50,11 @@ object ExecutorExitCode {
         "Failed to create local directory (bad spark.local.dir?)"
       case _ => 
         "Unknown executor exit code (" + exitCode + ")" + (
-          if (exitCode > 128)
+          if (exitCode > 128) {
             " (died from signal " + (exitCode - 128) + "?)"
-          else
+          } else {
             ""
+          }
         )
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 97176e4..c2e973e 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -55,7 +55,8 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
     override def getValue: Int = executor.threadPool.getPoolSize()
   })
 
-  // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+  // Gauge got executor thread pool's largest number of threads that have ever simultaneously
+  // been in th pool
   metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getMaximumPoolSize()
   })

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0c8f466..4553399 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -64,7 +64,8 @@ class TaskMetrics extends Serializable {
   var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
 
   /**
-   * If this task writes to shuffle output, metrics on the written shuffle data will be collected here
+   * If this task writes to shuffle output, metrics on the written shuffle data will be collected
+   * here
    */
   var shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9930537..de233e4 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -56,7 +56,8 @@ import org.apache.spark.metrics.source.Source
  * wild card "*" can be used to replace instance name, which means all the instances will have
  * this property.
  *
- * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ * [sink|source] means this property belongs to source or sink. This field can only be
+ * source or sink.
  *
  * [name] specify the name of sink or source, it is custom defined.
  *

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index cba8477..ae2007e 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -211,7 +211,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
             }
             return chunk
           } else {
-            /*logInfo("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() + "]")*/
             message.finishTime = System.currentTimeMillis
             logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
               "] in "  + message.timeTaken )
@@ -238,7 +237,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
               message.startTime = System.currentTimeMillis
             }
             logTrace(
-              "Sending chunk from [" + message+ "] to [" + getRemoteConnectionManagerId() + "]")
+              "Sending chunk from [" + message + "] to [" + getRemoteConnectionManagerId() + "]")
             return chunk
           } else {
             message.finishTime = System.currentTimeMillis
@@ -349,8 +348,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
             outbox.getChunk() match {
               case Some(chunk) => {
                 val buffers = chunk.buffers
-                // If we have 'seen' pending messages, then reset flag - since we handle that as normal 
-                // registering of event (below)
+                // If we have 'seen' pending messages, then reset flag - since we handle that as
+                // normal registering of event (below)
                 if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
                 currentBuffers ++= buffers
               }
@@ -404,7 +403,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
       }
     } catch {
       case e: Exception =>
-        logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(), e)
+        logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
+          e)
         callOnExceptionCallback(e)
         close()
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index e6e0178..24d0a7d 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -65,7 +65,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
-  // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
+  // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
+  // which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
     conf.getInt("spark.core.connection.connect.threads.min", 1),
     conf.getInt("spark.core.connection.connect.threads.max", 8),
@@ -73,8 +74,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()
-  private val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
-  private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
+  private val connectionsByKey = new HashMap[SelectionKey, Connection]
+    with SynchronizedMap[SelectionKey, Connection]
+  private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
+    with SynchronizedMap[ConnectionManagerId, SendingConnection]
   private val messageStatuses = new HashMap[Int, MessageStatus]
   private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
   private val registerRequests = new SynchronizedQueue[SendingConnection]
@@ -173,7 +176,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     if (conn == null) return
 
     // prevent other events from being triggered
-    // Since we are still trying to connect, we do not need to do the additional steps in triggerWrite
+    // Since we are still trying to connect, we do not need to do the additional steps in
+    // triggerWrite
     conn.changeConnectionKeyInterest(0)
 
     handleConnectExecutor.execute(new Runnable {
@@ -188,8 +192,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
         }
 
         // fallback to previous behavior : we should not really come here since this method was
-        // triggered since channel became connectable : but at times, the first finishConnect need not
-        // succeed : hence the loop to retry a few 'times'.
+        // triggered since channel became connectable : but at times, the first finishConnect need
+        // not succeed : hence the loop to retry a few 'times'.
         conn.finishConnect(true)
       }
     } )
@@ -258,8 +262,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
                     if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
                   }
 
-                  logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId()  +
-                    "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+                  logTrace("Changed key for connection to [" +
+                    connection.getRemoteConnectionManagerId()  + "] changed from [" +
+                      intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
                 }
               }
             } else {
@@ -282,7 +287,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
           try {
             selector.select()
           } catch {
-            // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
+            // Explicitly only dealing with CancelledKeyException here since other exceptions
+            // should be dealt with differently.
             case e: CancelledKeyException => {
               // Some keys within the selectors list are invalid/closed. clear them.
               val allKeys = selector.keys().iterator()
@@ -310,7 +316,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
           }
 
         if (selectedKeysCount == 0) {
-          logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
+          logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size +
+            " keys")
         }
         if (selectorThread.isInterrupted) {
           logInfo("Selector thread was interrupted!")
@@ -341,7 +348,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
                 throw new CancelledKeyException()
               }
             } catch {
-              // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+              // weird, but we saw this happening - even though key.isValid was true,
+              // key.isAcceptable would throw CancelledKeyException.
               case e: CancelledKeyException => {
                 logInfo("key already cancelled ? " + key, e)
                 triggerForceCloseByException(key, e)
@@ -437,9 +445,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
         assert (sendingConnectionManagerId == remoteConnectionManagerId)
 
         messageStatuses.synchronized {
-          for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
-            logInfo("Notifying " + s)
-            s.synchronized {
+          for (s <- messageStatuses.values if
+            s.connectionManagerId == sendingConnectionManagerId) {
+              logInfo("Notifying " + s)
+              s.synchronized {
               s.attempted = true
               s.acked = false
               s.markDone()
@@ -458,7 +467,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
   }
 
   def handleConnectionError(connection: Connection, e: Exception) {
-    logInfo("Handling connection error on connection to " + connection.getRemoteConnectionManagerId())
+    logInfo("Handling connection error on connection to " +
+      connection.getRemoteConnectionManagerId())
     removeConnection(connection)
   }
 
@@ -495,7 +505,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
                 status
               }
               case None => {
-                throw new Exception("Could not find reference for received ack message " + message.id)
+                throw new Exception("Could not find reference for received ack message " +
+                  message.id)
                 null
               }
             }
@@ -517,7 +528,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
 
           if (ackMessage.isDefined) {
             if (!ackMessage.get.isInstanceOf[BufferMessage]) {
-              logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " + ackMessage.get.getClass())
+              logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
+                + ackMessage.get.getClass())
             } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
               logDebug("Response to " + bufferMessage + " does not have ack id set")
               ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
@@ -535,14 +547,16 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
 
   private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
     def startNewConnection(): SendingConnection = {
-      val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
+      val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
+        connectionManagerId.port)
       val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId)
       registerRequests.enqueue(newConnection)
 
       newConnection
     }
-    // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it useful in our test-env ...
-    // If we do re-add it, we should consistently use it everywhere I guess ?
+    // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it
+    // useful in our test-env ... If we do re-add it, we should consistently use it everywhere I
+    // guess ?
     val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
     message.senderAddress = id.toSocketAddress()
     logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
@@ -558,15 +572,17 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
   def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
       : Future[Option[Message]] = {
     val promise = Promise[Option[Message]]
-    val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))
-    messageStatuses.synchronized {
+    val status = new MessageStatus(
+      message, connectionManagerId, s => promise.success(s.ackMessage))
+      messageStatuses.synchronized {
       messageStatuses += ((message.id, status))
     }
     sendMessage(connectionManagerId, message)
     promise.future
   }
 
-  def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = {
+  def sendMessageReliablySync(connectionManagerId: ConnectionManagerId,
+      message: Message): Option[Message] = {
     Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)
   }
 
@@ -656,7 +672,8 @@ private[spark] object ConnectionManager {
     val tput = mb * 1000.0 / ms
     println("--------------------------")
     println("Started at " + startTime + ", finished at " + finishTime)
-    println("Sent " + count + " messages of size " + size + " in " + ms + " ms (" + tput + " MB/s)")
+    println("Sent " + count + " messages of size " + size + " in " + ms + " ms " +
+      "(" + tput + " MB/s)")
     println("--------------------------")
     println()
   }
@@ -667,7 +684,8 @@ private[spark] object ConnectionManager {
     println("--------------------------")
     val size = 10 * 1024 * 1024
     val count = 10
-    val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
+    val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(
+      Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
     buffers.foreach(_.flip)
     val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 4f5742d..820045a 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -30,14 +30,14 @@ import scala.concurrent.duration._
 
 private[spark] object ConnectionManagerTest extends Logging{
   def main(args: Array[String]) {
-    //<mesos cluster> - the master URL
-    //<slaves file> - a list slaves to run connectionTest on
-    //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
-    //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
-    //[count] - how many times to run, default is 3
-    //[await time in seconds] : await time (in seconds), default is 600
+    // <mesos cluster> - the master URL <slaves file> - a list slaves to run connectionTest on
+    // [num of tasks] - the number of parallel tasks to be initiated default is number of slave
+    // hosts [size of msg in MB (integer)] - the size of messages to be sent in each task,
+    // default is 10 [count] - how many times to run, default is 3 [await time in seconds] :
+    // await time (in seconds), default is 600
     if (args.length < 2) {
-      println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
+      println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] " +
+        "[size of msg in MB (integer)] [count] [await time in seconds)] ")
       System.exit(1)
     }
     
@@ -56,7 +56,8 @@ private[spark] object ConnectionManagerTest extends Logging{
     val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024 
     val count = if (args.length > 4) args(4).toInt else 3
     val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
-    println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
+    println("Running " + count + " rounds of test: " + "parallel tasks = " + tasknum + ", " +
+      "msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
     val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
         i => SparkEnv.get.connectionManager.id).collect()
     println("\nSlave ConnectionManagerIds")
@@ -76,7 +77,8 @@ private[spark] object ConnectionManagerTest extends Logging{
         buffer.flip
         
         val startTime = System.currentTimeMillis  
-        val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => {
+        val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId =>
+        {
           val bufferMessage = Message.createBufferMessage(buffer.duplicate)
           logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
           connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
@@ -87,7 +89,8 @@ private[spark] object ConnectionManagerTest extends Logging{
         
         val mb = size * results.size / 1024.0 / 1024.0
         val ms = finishTime - startTime
-        val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+        val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms *
+          1000.0) + " MB/s"
         logInfo(resultStr)
         resultStr
       }).collect()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index dcbd183..9e03956 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -52,17 +52,20 @@ private[spark] object SenderTest {
       val dataMessage = Message.createBufferMessage(buffer.duplicate)
       val startTime = System.currentTimeMillis
       /*println("Started timer at " + startTime)*/
-      val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
-        case Some(response) =>
-          val buffer = response.asInstanceOf[BufferMessage].buffers(0)
-          new String(buffer.array)
-        case None => "none"
-      }
+      val responseStr =
+          manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
+              case Some(response) =>
+                  val buffer = response.asInstanceOf[BufferMessage].buffers(0)
+                  new String(buffer.array)
+              case None => "none"
+          }
       val finishTime = System.currentTimeMillis
       val mb = size / 1024.0 / 1024.0
       val ms = finishTime - startTime
-      /*val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"*/
-      val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" +  (mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr
+      // val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms
+      //  * 1000.0) + " MB/s"
+      val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" +  (mb / ms *
+        1000.0).toInt + "MB/s) | Response = " + responseStr
       println(resultStr)
     })
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 30e578d..8f9d1d5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -60,7 +60,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
   checkpointData.get.cpFile = Some(checkpointPath)
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
+    val status = fs.getFileStatus(new Path(checkpointPath,
+      CheckpointRDD.splitIdToFile(split.index)))
     val locations = fs.getFileBlockLocations(status, 0, status.getLen)
     locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index cefcc3d..42e1ef8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -197,8 +197,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
 
     // return the next preferredLocation of some partition of the RDD
     def next(): (String, Partition) = {
-      if (it.hasNext)
+      if (it.hasNext) {
         it.next()
+      }
       else {
         it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
         it.next()
@@ -290,8 +291,10 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
     val r1 = rnd.nextInt(groupArr.size)
     val r2 = rnd.nextInt(groupArr.size)
     val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
-    if (prefPart== None) // if no preferred locations, just use basic power of two
-      return minPowerOfTwo
+    if (prefPart == None) {
+        // if no preferred locations, just use basic power of two
+        return minPowerOfTwo
+    }
 
     val prefPartActual = prefPart.get
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 688c310..20713b4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -37,8 +37,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
   }
 
   /**
-   * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
-   * of the RDD's elements in one operation.
+   * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
+   * count of the RDD's elements in one operation.
    */
   def stats(): StatCounter = {
     self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 3700614..10d519e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -705,7 +705,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     }
 
     logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
-      valueClass.getSimpleName+ ")")
+      valueClass.getSimpleName + ")")
 
     val writer = new SparkHadoopWriter(conf)
     writer.preSetup()