You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/02/10 07:17:58 UTC

[1/2] Merge pull request #567 from ScrapCodes/style2.

Updated Branches:
  refs/heads/master 2182aa3c5 -> 919bd7f66


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 62cfa0a..4dcd0e4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
-   * partitioning of each RDD.
+   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+   * thepartitioning of each RDD.
    */
   def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
     dstream.reduceByKey(func, partitioner)
@@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
-   * information.
+   * combineByKey for RDDs. Please refer to combineByKey in
+   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
       mergeValue: JFunction2[C, V, C],
@@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
-   * information.
+   * combineByKey for RDDs. Please refer to combineByKey in
+   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
       mergeValue: JFunction2[C, V, C],
@@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+   *                    DStream.
    */
   def groupByKeyAndWindow(
       windowDuration: Duration,
@@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+   *                    DStream.
    */
   def reduceByKeyAndWindow(
       reduceFunc: Function2[V, V, V],
@@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+   *                    DStream.
    * @param filterFunc     function to filter expired key-value pairs;
    *                       only pairs that satisfy the function are retained
    *                       set this to null if you do not want to filter
@@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
    *                   corresponding state key-value pair will be eliminated.
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+   *                    DStream.
    * @tparam S State type
    */
   def updateStateByKey[S](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 921b561..2268160 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be divided into batches
    * @param sparkHome The SPARK_HOME directory on the slave nodes
-   * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local
-   *                file system or an HDFS, HTTP, HTTPS, or FTP URL.
+   * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the
+   *                local file system or an HDFS, HTTP, HTTPS, or FTP URL.
    */
   def this(
       master: String,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 906a16e..903e3f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   }
 
   override def toString() = {
-    "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]"
+    "[\n" + currentCheckpointFiles.size + " checkpoint files \n" +
+      currentCheckpointFiles.mkString("\n") + "\n]"
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 2730339..226844c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
     } else {
       // Time is valid, but check it it is more than lastValidTime
       if (lastValidTime != null && time < lastValidTime) {
-        logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+        logWarning("isTimeValid called with " + time + " where as last valid time is " +
+          lastValidTime)
       }
       lastValidTime = time
       true

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index ce153f0..0dc6704 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte
 
 private[streaming] sealed trait NetworkReceiverMessage
 private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
+  extends NetworkReceiverMessage
 private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
 
 /**
@@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   }
 
   /**
-   * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into
-   * appropriately named blocks at regular intervals. This class starts two threads,
+   * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
+   * them into appropriately named blocks at regular intervals. This class starts two threads,
    * one to periodically start a new batch and prepare the previous batch of as a block,
    * the other to push the blocks into the block manager.
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb9df2f..f3c58ae 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration}
  * these functions.
  */
 class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
-extends Serializable {
+  extends Serializable {
 
   private[streaming] def ssc = self.ssc
 
-  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
+  private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)
+  = {
     new HashPartitioner(numPartitions)
   }
 
@@ -63,8 +64,8 @@ extends Serializable {
   }
 
   /**
-   * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]]
-   * is used to control the partitioning of each RDD.
+   * Return a new DStream by applying `groupByKey` on each RDD. The supplied
+   * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    */
   def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
     val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -94,8 +95,8 @@ extends Serializable {
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
-   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the
-   * partitioning of each RDD.
+   * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+   * the partitioning of each RDD.
    */
   def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -113,7 +114,8 @@ extends Serializable {
     mergeCombiner: (C, C) => C,
     partitioner: Partitioner,
     mapSideCombine: Boolean = true): DStream[(K, C)] = {
-    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
+    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
+      mapSideCombine)
   }
 
   /**
@@ -138,7 +140,8 @@ extends Serializable {
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
    */
-  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
+  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] =
+  {
     groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
   }
 
@@ -170,7 +173,8 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new
+   *                       DStream.
    */
   def groupByKeyAndWindow(
       windowDuration: Duration,
@@ -239,7 +243,8 @@ extends Serializable {
       slideDuration: Duration,
       numPartitions: Int
     ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
+    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
+      defaultPartitioner(numPartitions))
   }
 
   /**
@@ -315,7 +320,8 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of each RDD in the new
+   *                       DStream.
    * @param filterFunc     Optional function to filter expired key-value pairs;
    *                       only pairs that satisfy the function are retained
    */
@@ -373,7 +379,8 @@ extends Serializable {
    * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
    * @param updateFunc State update function. If `this` function returns None, then
    *                   corresponding state key-value pair will be eliminated.
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+   *                    DStream.
    * @tparam S State type
    */
   def updateStateByKey[S: ClassTag](
@@ -395,7 +402,8 @@ extends Serializable {
    *                   this function may generate a different a tuple with a different key
    *                   than the input key. It is up to the developer to decide whether to
    *                   remember the partitioner despite the key being changed.
-   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
+   *                    DStream
    * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
    * @tparam S State type
    */
@@ -438,7 +446,8 @@ extends Serializable {
    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    */
-  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
+  : DStream[(K, (Seq[V], Seq[W]))] = {
     cogroup(other, defaultPartitioner(numPartitions))
   }
 
@@ -566,7 +575,8 @@ extends Serializable {
       prefix: String,
       suffix: String
     )(implicit fm: ClassTag[F]) {
-    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+      fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
   /**
@@ -580,7 +590,7 @@ extends Serializable {
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       conf: JobConf = new JobConf
-    ) {  
+    ) {
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
@@ -596,7 +606,8 @@ extends Serializable {
       prefix: String,
       suffix: String
     )(implicit fm: ClassTag[F])  {
-    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+      fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 7a6b1ea..ca0a8ae 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     val invReduceF = invReduceFunc
 
     val currentTime = validTime
-    val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime)
+    val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
+      currentTime)
     val previousWindow = currentWindow - slideDuration
 
     logDebug("Window time = " + windowDuration)
@@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
 
     // Cogroup the reduced RDDs and merge the reduced values
-    val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
+    val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
+      partitioner)
     //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
 
     val numOldValues = oldRDDs.size

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 4ecba03..57429a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
     val rdds = new ArrayBuffer[RDD[T]]()
     parents.map(_.getOrCompute(validTime)).foreach(_ match {
       case Some(rdd) => rdds += rdd
-      case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+      case None => throw new Exception("Could not generate RDD from a parent for unifying at time "
+        + validTime)
     })
     if (rdds.size > 0) {
       Some(new UnionRDD(ssc.sc, rdds))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 6301772..24289b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag](
     _slideDuration: Duration)
   extends DStream[T](parent.ssc) {
 
-  if (!_windowDuration.isMultipleOf(parent.slideDuration))
+  if (!_windowDuration.isMultipleOf(parent.slideDuration)) {
     throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
     "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+  }
 
-  if (!_slideDuration.isMultipleOf(parent.slideDuration))
+  if (!_slideDuration.isMultipleOf(parent.slideDuration)) {
     throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
     "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+  }
 
   // Persist parent level by default, as those RDDs are going to be obviously reused.
   parent.persist(StorageLevel.MEMORY_ONLY_SER)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index b5f11d3..c730624 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
   }
   private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
     longTime => eventActor ! GenerateJobs(new Time(longTime)))
-  private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
-    new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
+  private lazy val checkpointWriter =
+    if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+      new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
   } else {
     null
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 0d9733f..e4fa163 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext}
 import org.apache.spark.util.AkkaUtils
 
 private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
+  extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
+  extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
+  extends NetworkInputTrackerMessage
 
 /**
  * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
@@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
     }
 
     if (!networkInputStreams.isEmpty) {
-      actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+      actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),
+        "NetworkInputTracker")
       receiverExecutor.start()
       logInfo("NetworkInputTracker started")
     }
@@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
           throw new Exception("Register received for unexpected id " + streamId)
         }
         receiverInfo += ((streamId, receiverActor))
-        logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
+        logInfo("Registered receiver for network stream " + streamId + " from "
+          + sender.path.address)
         sender ! true
       }
       case AddBlocks(streamId, blockIds, metadata) => {
@@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
       })
 
       // Right now, we only honor preferences if all receivers have them
-      val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+      val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined)
+        .reduce(_ && _)
 
       // Create the parallel collection of receivers to distributed them on the worker nodes
       val tempRDD =
         if (hasLocationPreferences) {
-          val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
+          val receiversWithPreferences =
+            receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
           ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 3063cf1..18811fc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue
 
 /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
 private[spark] class StreamingListenerBus() extends Logging {
-  private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener]
+  private val listeners = new ArrayBuffer[StreamingListener]()
+    with SynchronizedBuffer[StreamingListener]
 
   /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
    * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 6a45bc2..2bb616c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
             }
           }
         }
-    if (!done)
+        if (!done) {
           logError("Could not generate file " + hadoopFile)
-        else
+        } else {
           logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+        }
         Thread.sleep(interval)
         localFile.delete()
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 179fd75..2b8cdb7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu
       }
     } else {
       // Calculate how much time we should sleep to bring ourselves to the desired rate.
-      // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
-      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+      // Based on throttler in Kafka
+      // scalastyle:off
+      // (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+      // scalastyle:on
+      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs),
+        SECONDS)
       if (sleepTime > 0) Thread.sleep(sleepTime)
       waitToWrite(numBytes)
     }


[2/2] git commit: Merge pull request #567 from ScrapCodes/style2.

Posted by rx...@apache.org.
Merge pull request #567 from ScrapCodes/style2.

SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2

Continuation of PR #557

With this all scala style errors are fixed across the code base !!

The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too.

Author: Prashant Sharma <pr...@imaginea.com>

Closes #567 and squashes the following commits:

3b1ec30 [Prashant Sharma] scala style fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/919bd7f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/919bd7f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/919bd7f6

Branch: refs/heads/master
Commit: 919bd7f669c61500eee7231298d9880b320eb6f3
Parents: 2182aa3
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Sun Feb 9 22:17:52 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Feb 9 22:17:52 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/bagel/Bagel.scala    |  3 +-
 .../scala/org/apache/spark/CacheManager.scala   |  2 +-
 .../spark/deploy/client/AppClientListener.scala |  3 +-
 .../org/apache/spark/deploy/master/Master.scala |  6 +-
 .../spark/deploy/master/ui/IndexPage.scala      |  3 +-
 .../spark/deploy/worker/CommandUtils.scala      |  2 +-
 .../spark/network/ConnectionManager.scala       | 20 +++--
 .../spark/network/ConnectionManagerTest.scala   | 13 +--
 .../org/apache/spark/network/SenderTest.scala   | 17 ++--
 .../org/apache/spark/rdd/CoalescedRDD.scala     |  9 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |  4 +-
 .../org/apache/spark/scheduler/ResultTask.scala |  8 +-
 .../apache/spark/scheduler/SparkListener.scala  | 18 ++--
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../spark/storage/BlockMessageArray.scala       |  2 +-
 .../scala/org/apache/spark/ui/JettyUtils.scala  |  6 +-
 .../scala/org/apache/spark/ui/UIUtils.scala     |  7 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |  5 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 19 ++--
 .../org/apache/spark/ui/jobs/StageTable.scala   |  4 +-
 .../org/apache/spark/util/ClosureCleaner.scala  |  4 +-
 .../org/apache/spark/util/SizeEstimator.scala   |  2 +-
 .../apache/spark/examples/BroadcastTest.scala   | 14 +--
 .../apache/spark/examples/CassandraTest.scala   | 92 ++++++++++----------
 .../spark/examples/ExceptionHandlingTest.scala  |  3 +-
 .../org/apache/spark/examples/GroupByTest.scala |  7 +-
 .../org/apache/spark/examples/LocalFileLR.scala |  2 +-
 .../org/apache/spark/examples/LocalKMeans.scala |  6 +-
 .../org/apache/spark/examples/LogQuery.scala    |  4 +-
 .../spark/examples/SkewedGroupByTest.scala      |  3 +-
 .../org/apache/spark/examples/SparkHdfsLR.scala |  3 +-
 .../spark/examples/bagel/PageRankUtils.scala    | 15 ++--
 .../examples/bagel/WikipediaPageRank.scala      | 17 ++--
 .../bagel/WikipediaPageRankStandalone.scala     | 22 +++--
 .../streaming/examples/ActorWordCount.scala     |  2 +-
 .../streaming/examples/KafkaWordCount.scala     |  4 +-
 .../streaming/examples/MQTTWordCount.scala      |  7 +-
 .../streaming/examples/NetworkWordCount.scala   |  2 +
 .../examples/RecoverableNetworkWordCount.scala  | 21 +++--
 .../streaming/examples/TwitterAlgebirdCMS.scala | 20 +++--
 .../streaming/examples/TwitterAlgebirdHLL.scala |  3 +-
 .../streaming/examples/ZeroMQWordCount.scala    |  5 +-
 .../scala/org/apache/spark/graphx/EdgeRDD.scala |  3 +-
 project/project/SparkPluginBuild.scala          |  4 +-
 scalastyle-config.xml                           | 19 +++-
 .../org/apache/spark/streaming/Checkpoint.scala | 12 ++-
 .../apache/spark/streaming/DStreamGraph.scala   |  3 +-
 .../org/apache/spark/streaming/Interval.scala   |  3 +-
 .../streaming/api/java/JavaDStreamLike.scala    | 12 +--
 .../streaming/api/java/JavaPairDStream.scala    | 24 ++---
 .../api/java/JavaStreamingContext.scala         |  4 +-
 .../dstream/DStreamCheckpointData.scala         |  3 +-
 .../spark/streaming/dstream/InputDStream.scala  |  3 +-
 .../streaming/dstream/NetworkInputDStream.scala |  7 +-
 .../dstream/PairDStreamFunctions.scala          | 45 ++++++----
 .../dstream/ReducedWindowedDStream.scala        |  6 +-
 .../spark/streaming/dstream/UnionDStream.scala  |  3 +-
 .../streaming/dstream/WindowedDStream.scala     |  6 +-
 .../streaming/scheduler/JobGenerator.scala      |  5 +-
 .../scheduler/NetworkInputTracker.scala         | 21 +++--
 .../scheduler/StreamingListenerBus.scala        |  3 +-
 .../streaming/util/MasterFailureTest.scala      |  5 +-
 .../util/RateLimitedOutputStream.scala          |  8 +-
 63 files changed, 356 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 2812166..dd3eed8 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -33,7 +33,8 @@ object Bagel extends Logging {
    * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
    *                 this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
    * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
-   *                 given vertex into one message before sending (which often involves network I/O).
+   *                 given vertex into one message before sending (which often involves network
+   *                 I/O).
    * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
    *                  after each superstep and provides the result to each vertex in the next
    *                  superstep.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 15a0d24..b38af24 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -32,7 +32,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
 
   /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
   def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
-    storageLevel: StorageLevel): Iterator[T] = {
+      storageLevel: StorageLevel): Iterator[T] = {
     val key = RDDBlockId(rdd.id, split.index)
     logDebug("Looking for partition " + key)
     blockManager.get(key) match {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
index 2f2cbd1..1f20aa3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala
@@ -33,8 +33,7 @@ private[spark] trait AppClientListener {
   /** Dead means that we couldn't find any Masters to connect to, and have given up. */
   def dead(): Unit
 
-  def executorAdded(
-      fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
+  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
 
   def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 82bf655..0bb9a9a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -166,8 +166,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       System.exit(0)
     }
 
-    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress)
-    => {
+    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
+    {
       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
         host, workerPort, cores, Utils.megabytesToString(memory)))
       if (state == RecoveryState.STANDBY) {
@@ -176,7 +176,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         sender ! RegisterWorkerFailed("Duplicate worker ID")
       } else {
         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
-          sender, workerWebUiPort, publicAddress)
+          sender, workerUiPort, publicAddress)
         if (registerWorker(worker)) {
           persistenceEngine.addWorker(worker)
           sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 64ecf22..04f9a22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -123,7 +123,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
         </div>
 
         <div>
-          {if (hasDrivers) {
+          {
+            if (hasDrivers) {
               <div class="row-fluid">
                 <div class="span12">
                   <h4> Completed Drivers </h4>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index f411eb9..2ceccc7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -50,7 +50,7 @@ object CommandUtils extends Logging {
       .map(p => List("-Djava.library.path=" + p))
       .getOrElse(Nil)
     val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
-            .map(Utils.splitCommandString).getOrElse(Nil)
+      .map(Utils.splitCommandString).getOrElse(Nil)
     val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
     val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 24d0a7d..a78d6ac 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -74,8 +74,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()
-  private val connectionsByKey = new HashMap[SelectionKey, Connection]
-    with SynchronizedMap[SelectionKey, Connection]
+  private val connectionsByKey =
+    new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
   private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
     with SynchronizedMap[ConnectionManagerId, SendingConnection]
   private val messageStatuses = new HashMap[Int, MessageStatus]
@@ -445,10 +445,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
         assert (sendingConnectionManagerId == remoteConnectionManagerId)
 
         messageStatuses.synchronized {
-          for (s <- messageStatuses.values if
-            s.connectionManagerId == sendingConnectionManagerId) {
-              logInfo("Notifying " + s)
-              s.synchronized {
+          for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
+            logInfo("Notifying " + s)
+            s.synchronized {
               s.attempted = true
               s.acked = false
               s.markDone()
@@ -574,7 +573,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     val promise = Promise[Option[Message]]
     val status = new MessageStatus(
       message, connectionManagerId, s => promise.success(s.ackMessage))
-      messageStatuses.synchronized {
+    messageStatuses.synchronized {
       messageStatuses += ((message.id, status))
     }
     sendMessage(connectionManagerId, message)
@@ -684,8 +683,11 @@ private[spark] object ConnectionManager {
     println("--------------------------")
     val size = 10 * 1024 * 1024
     val count = 10
-    val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(
-      Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
+    val buffers = Array.tabulate(count) { i =>
+      val bufferLen = size * (i + 1)
+      val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte)
+      ByteBuffer.allocate(bufferLen).put(bufferContent)
+    }
     buffers.foreach(_.flip)
     val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
index 820045a..8e5c529 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala
@@ -77,12 +77,13 @@ private[spark] object ConnectionManagerTest extends Logging{
         buffer.flip
         
         val startTime = System.currentTimeMillis  
-        val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId =>
-        {
-          val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-          logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
-          connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
-        })
+        val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId =>
+          {
+            val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+            logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
+            connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
+          }
+        }
         val results = futures.map(f => Await.result(f, awaitTime))
         val finishTime = System.currentTimeMillis
         Thread.sleep(5000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 9e03956..162d49b 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -52,20 +52,19 @@ private[spark] object SenderTest {
       val dataMessage = Message.createBufferMessage(buffer.duplicate)
       val startTime = System.currentTimeMillis
       /*println("Started timer at " + startTime)*/
-      val responseStr =
-          manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
-              case Some(response) =>
-                  val buffer = response.asInstanceOf[BufferMessage].buffers(0)
-                  new String(buffer.array)
-              case None => "none"
-          }
+      val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
+        .map { response =>
+          val buffer = response.asInstanceOf[BufferMessage].buffers(0)
+          new String(buffer.array)
+        }.getOrElse("none")
+
       val finishTime = System.currentTimeMillis
       val mb = size / 1024.0 / 1024.0
       val ms = finishTime - startTime
       // val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms
       //  * 1000.0) + " MB/s"
-      val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" +  (mb / ms *
-        1000.0).toInt + "MB/s) | Response = " + responseStr
+      val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" +
+        (mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr
       println(resultStr)
     })
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 42e1ef8..dc345b2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -199,8 +199,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
     def next(): (String, Partition) = {
       if (it.hasNext) {
         it.next()
-      }
-      else {
+      } else {
         it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
         it.next()
       }
@@ -291,9 +290,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
     val r1 = rnd.nextInt(groupArr.size)
     val r2 = rnd.nextInt(groupArr.size)
     val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
-    if (prefPart == None) {
-        // if no preferred locations, just use basic power of two
-        return minPowerOfTwo
+    if (prefPart.isEmpty) {
+      // if no preferred locations, just use basic power of two
+      return minPowerOfTwo
     }
 
     val prefPartActual = prefPart.get

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 56c7777..f270c1a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -39,8 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
   override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
 
   override def equals(other: Any): Boolean = other match {
-    case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId &&
-            this.slice == that.slice)
+    case that: ParallelCollectionPartition[_] =>
+      this.rddId == that.rddId && this.slice == that.slice
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 0544f81..77b1682 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -36,8 +36,8 @@ private[spark] object ResultTask {
   val metadataCleaner = new MetadataCleaner(
     MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
 
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _)
-  : Array[Byte] = {
+  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] =
+  {
     synchronized {
       val old = serializedInfoCache.get(stageId).orNull
       if (old != null) {
@@ -56,8 +56,8 @@ private[spark] object ResultTask {
     }
   }
 
-  def deserializeInfo(stageId: Int, bytes: Array[Byte])
-  : (RDD[_], (TaskContext, Iterator[_]) => _) = {
+  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) =
+  {
     val loader = Thread.currentThread.getContextClassLoader
     val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
     val ser = SparkEnv.get.closureSerializer.newInstance()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d25f0a6..129153c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -100,13 +100,13 @@ class StatsReportListener extends SparkListener with Logging {
 
     //shuffle write
     showBytesDistribution("shuffle bytes written:",
-      (_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+      (_,metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten))
 
     //fetch & io
     showMillisDistribution("fetch wait time:",
-      (_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+      (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime))
     showBytesDistribution("remote bytes read:",
-      (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+      (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead))
     showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
 
     //runtime breakdown
@@ -152,8 +152,8 @@ private[spark] object StatsReportListener extends Logging {
     logInfo("\t" + quantiles.mkString("\t"))
   }
 
-  def showDistribution(heading: String,
-      dOpt: Option[Distribution], formatNumber: Double => String) {
+  def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String)
+  {
     dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
   }
 
@@ -162,9 +162,11 @@ private[spark] object StatsReportListener extends Logging {
     showDistribution(heading, dOpt, f _)
   }
 
-  def showDistribution(heading:String, format: String,
-      getMetric: (TaskInfo,TaskMetrics) => Option[Double])
-    (implicit stage: SparkListenerStageCompleted) {
+  def showDistribution(
+      heading: String,
+      format: String,
+      getMetric: (TaskInfo, TaskMetrics) => Option[Double])
+      (implicit stage: SparkListenerStageCompleted) {
     showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 542deb9..780a3a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -291,7 +291,7 @@ private[spark] class BlockManager(
           throw new Exception("Block " + blockId + " not found on disk, though it should be")
       }
     } else {
-        doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+      doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 5ded9ab..dc62b1e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -26,7 +26,7 @@ import org.apache.spark.network._
 
 private[spark]
 class BlockMessageArray(var blockMessages: Seq[BlockMessage])
-    extends Seq[BlockMessage] with Logging {
+  extends Seq[BlockMessage] with Logging {
   
   def this(bm: BlockMessage) = this(Array(bm))
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index b3deb41..ade8ba1 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -94,12 +94,14 @@ private[spark] object JettyUtils extends Logging {
   }
 
   /**
-   * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied handlers.
+   * Attempts to start a Jetty server at the supplied hostName:port which uses the supplied
+   * handlers.
    *
    * If the desired port number is contented, continues incrementing ports until a free port is
    * found. Returns the chosen port and the jetty Server object.
    */
-  def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
+  def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int)
+  = {
 
     val handlersToRegister = handlers.map { case(path, handler) =>
       val contextHandler = new ContextHandler(path)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index b95c8f4..547a194 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -48,8 +48,8 @@ private[spark] object UIUtils {
       case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
     }
     val executors = page match {
-      case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a>
-      </li>
+      case Executors =>
+        <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
       case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
     }
 
@@ -66,7 +66,8 @@ private[spark] object UIUtils {
         <div class="navbar navbar-static-top">
           <div class="navbar-inner">
             <a href={prependBaseUri("/")} class="brand">
-                <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}  /></a>
+              <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
+            </a>
             <ul class="nav">
               {jobs}
               {storage}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index 9412a48..22bc97a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -61,9 +61,8 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
     }
     <tr>
       <td>
-          <a href=
-             {"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>
-              {p.name}</a></td>
+        <a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a>
+      </td>
       <td>{p.minShare}</td>
       <td>{p.weight}</td>
       <td>{activeStages}</td>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 08107a3..b6e9894 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -223,28 +223,27 @@ private[spark] class StagePage(parent: JobProgressUI) {
     val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
     val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
 
-    val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
+    val maybeShuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => s.remoteBytesRead)
     val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
-    val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
+    val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
 
     val maybeShuffleWrite =
-      metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
+      metrics.flatMap{m => m.shuffleWriteMetrics}.map(s => s.shuffleBytesWritten)
     val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
-    val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
+    val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("")
 
-    val maybeWriteTime = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleWriteTime}
+    val maybeWriteTime = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => s.shuffleWriteTime)
     val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
-    val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
+    val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map{ ms =>
       if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
 
-    val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
+    val maybeMemoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled)
     val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
-    val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}
-      .getOrElse("")
+    val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
 
     val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
     val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
-    val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+    val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
 
     <tr>
       <td>{info.index}</td>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 01b6479..999a94f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -59,8 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
     </table>
   }
 
-  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int)
-  : Seq[Node] = {
+  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
+  {
     val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
     val startWidth = "width: %s%%".format((started.toDouble/total)*100)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 3eb0f08..c0c057b 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -180,8 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]])
-    extends ClassVisitor(ASM4) {
+private[spark]
+class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
     new MethodVisitor(ASM4) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 5f86795..17c6481 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -241,7 +241,7 @@ private[spark] object SizeEstimator extends Logging {
     } else if (cls == classOf[Double]) {
       DOUBLE_SIZE
     } else {
-        throw new IllegalArgumentException(
+      throw new IllegalArgumentException(
       "Non-primitive class " + cls + " passed to primitiveSize()")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 0097dad..4d2f45d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -22,19 +22,21 @@ import org.apache.spark.SparkContext
 object BroadcastTest {
   def main(args: Array[String]) {
     if (args.length == 0) {
-      System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]")
+      System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo]" +
+        " [blockSize]")
       System.exit(1)
-    }  
-    
+    }
+
     val bcName = if (args.length > 3) args(3) else "Http"
     val blockSize = if (args.length > 4) args(4) else "4096"
 
-    System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
+    System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
+      "BroadcastFactory")
     System.setProperty("spark.broadcast.blockSize", blockSize)
 
     val sc = new SparkContext(args(0), "Broadcast Test",
       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-    
+
     val slices = if (args.length > 1) args(1).toInt else 2
     val num = if (args.length > 2) args(2).toInt else 1000000
 
@@ -42,7 +44,7 @@ object BroadcastTest {
     for (i <- 0 until arr1.length) {
       arr1(i) = i
     }
-    
+
     for (i <- 0 until 3) {
       println("Iteration " + i)
       println("===========")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 33bf715..3e3a3b2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -146,68 +146,68 @@ assume Words keys as utf8;
 set Words['3musk001']['book'] = 'The Three Musketeers';
 set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
   town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
-	be in as perfect a state of revolution as if the Huguenots had just made
-	a second La Rochelle of it. Many citizens, seeing the women flying
-	toward the High Street, leaving their children crying at the open doors,
-	hastened to don the cuirass, and supporting their somewhat uncertain
-	courage with a musket or a partisan, directed their steps toward the
-	hostelry of the Jolly Miller, before which was gathered, increasing
-	every minute, a compact group, vociferous and full of curiosity.';
+ be in as perfect a state of revolution as if the Huguenots had just made
+ a second La Rochelle of it. Many citizens, seeing the women flying
+ toward the High Street, leaving their children crying at the open doors,
+ hastened to don the cuirass, and supporting their somewhat uncertain
+ courage with a musket or a partisan, directed their steps toward the
+ hostelry of the Jolly Miller, before which was gathered, increasing
+ every minute, a compact group, vociferous and full of curiosity.';
 
 set Words['3musk002']['book'] = 'The Three Musketeers';
 set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
   some city or other registering in its archives an event of this kind. There were
-	nobles, who made war against each other; there was the king, who made
-	war against the cardinal; there was Spain, which made war against the
-	king. Then, in addition to these concealed or public, secret or open
-	wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
-	who made war upon everybody. The citizens always took up arms readily
-	against thieves, wolves or scoundrels, often against nobles or
-	Huguenots, sometimes against the king, but never against cardinal or
-	Spain. It resulted, then, from this habit that on the said first Monday
-	of April, 1625, the citizens, on hearing the clamor, and seeing neither
-	the red-and-yellow standard nor the livery of the Duc de Richelieu,
-	rushed toward the hostel of the Jolly Miller. When arrived there, the
-	cause of the hubbub was apparent to all';
+  nobles, who made war against each other; there was the king, who made
+  war against the cardinal; there was Spain, which made war against the
+  king. Then, in addition to these concealed or public, secret or open
+  wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
+  who made war upon everybody. The citizens always took up arms readily
+  against thieves, wolves or scoundrels, often against nobles or
+  Huguenots, sometimes against the king, but never against cardinal or
+  Spain. It resulted, then, from this habit that on the said first Monday
+  of April, 1625, the citizens, on hearing the clamor, and seeing neither
+  the red-and-yellow standard nor the livery of the Duc de Richelieu,
+  rushed toward the hostel of the Jolly Miller. When arrived there, the
+  cause of the hubbub was apparent to all';
 
 set Words['3musk003']['book'] = 'The Three Musketeers';
 set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
   large the sum may be; but you ought also to endeavor to perfect yourself in
-	the exercises becoming a gentleman. I will write a letter today to the
-	Director of the Royal Academy, and tomorrow he will admit you without
-	any expense to yourself. Do not refuse this little service. Our
-	best-born and richest gentlemen sometimes solicit it without being able
-	to obtain it. You will learn horsemanship, swordsmanship in all its
-	branches, and dancing. You will make some desirable acquaintances; and
-	from time to time you can call upon me, just to tell me how you are
-	getting on, and to say whether I can be of further service to you.';
+  the exercises becoming a gentleman. I will write a letter today to the
+  Director of the Royal Academy, and tomorrow he will admit you without
+  any expense to yourself. Do not refuse this little service. Our
+  best-born and richest gentlemen sometimes solicit it without being able
+  to obtain it. You will learn horsemanship, swordsmanship in all its
+  branches, and dancing. You will make some desirable acquaintances; and
+  from time to time you can call upon me, just to tell me how you are
+  getting on, and to say whether I can be of further service to you.';
 
 
 set Words['thelostworld001']['book'] = 'The Lost World';
 set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
   against the red curtain.  How beautiful she was!  And yet how aloof!  We had been
-	friends, quite good friends; but never could I get beyond the same
-	comradeship which I might have established with one of my
-	fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
-	and perfectly unsexual.  My instincts are all against a woman being too
-	frank and at her ease with me.  It is no compliment to a man.  Where
-	the real sex feeling begins, timidity and distrust are its companions,
-	heritage from old wicked days when love and violence went often hand in
-	hand.  The bent head, the averted eye, the faltering voice, the wincing
-	figure--these, and not the unshrinking gaze and frank reply, are the
-	true signals of passion.  Even in my short life I had learned as much
-	as that--or had inherited it in that race memory which we call instinct.';
+  friends, quite good friends; but never could I get beyond the same
+  comradeship which I might have established with one of my
+  fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
+  and perfectly unsexual.  My instincts are all against a woman being too
+  frank and at her ease with me.  It is no compliment to a man.  Where
+  the real sex feeling begins, timidity and distrust are its companions,
+  heritage from old wicked days when love and violence went often hand in
+  hand.  The bent head, the averted eye, the faltering voice, the wincing
+  figure--these, and not the unshrinking gaze and frank reply, are the
+  true signals of passion.  Even in my short life I had learned as much
+  as that--or had inherited it in that race memory which we call instinct.';
 
 set Words['thelostworld002']['book'] = 'The Lost World';
 set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
   red-headed news editor, and I rather hoped that he liked me.  Of course, Beaumont was
-	the real boss; but he lived in the rarefied atmosphere of some Olympian
-	height from which he could distinguish nothing smaller than an
-	international crisis or a split in the Cabinet.  Sometimes we saw him
-	passing in lonely majesty to his inner sanctum, with his eyes staring
-	vaguely and his mind hovering over the Balkans or the Persian Gulf.  He
-	was above and beyond us.  But McArdle was his first lieutenant, and it
-	was he that we knew.  The old man nodded as I entered the room, and he
-	pushed his spectacles far up on his bald forehead.';
+  the real boss; but he lived in the rarefied atmosphere of some Olympian
+  height from which he could distinguish nothing smaller than an
+  international crisis or a split in the Cabinet.  Sometimes we saw him
+  passing in lonely majesty to his inner sanctum, with his eyes staring
+  vaguely and his mind hovering over the Balkans or the Persian Gulf.  He
+  was above and beyond us.  But McArdle was his first lieutenant, and it
+  was he that we knew.  The old man nodded as I entered the room, and he
+  pushed his spectacles far up on his bald forehead.';
 
 */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
index b3eb611..fdb976d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
@@ -29,8 +29,9 @@ object ExceptionHandlingTest {
     val sc = new SparkContext(args(0), "ExceptionHandlingTest",
       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
     sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
-      if (math.random > 0.75)
+      if (math.random > 0.75) {
         throw new Exception("Testing exception handling")
+      }
     }
 
     System.exit(0)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
index 39752fd..36534e5 100644
--- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -24,7 +24,8 @@ import java.util.Random
 object GroupByTest {
   def main(args: Array[String]) {
     if (args.length == 0) {
-      System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+      System.err.println(
+        "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
       System.exit(1)
     }
     
@@ -35,7 +36,7 @@ object GroupByTest {
 
     val sc = new SparkContext(args(0), "GroupBy Test",
       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
-    
+
     val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
       val ranGen = new Random
       var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
@@ -48,7 +49,7 @@ object GroupByTest {
     }.cache
     // Enforce that everything has been calculated and in cache
     pairs1.count
-    
+
     println(pairs1.groupByKey(numReducers).count)
 
     System.exit(0)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index 9ab5f5a..737c444 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -28,7 +28,7 @@ object LocalFileLR {
 
   def parsePoint(line: String): DataPoint = {
     val nums = line.split(' ').map(_.toDouble)
-    DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+    DataPoint(new Vector(nums.slice(1, D + 1)), nums(0))
   }
 
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index a730464..3895675 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -80,7 +80,11 @@ object LocalKMeans {
 
       var mappings = closest.groupBy[Int] (x => x._1)
 
-      var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+      var pointStats = mappings.map { pair =>
+        pair._2.reduceLeft [(Int, (Vector, Int))] {
+          case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2))
+        }
+      }
 
       var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 544c782..fcaba6b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -50,10 +50,10 @@ object LogQuery {
     val dataSet =
       if (args.length == 2) sc.textFile(args(1))
       else sc.parallelize(exampleApacheLogs)
-
+    // scalastyle:off
     val apacheLogRegex =
       """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
-
+    // scalastyle:on
     /** Tracks the total query count and number of aggregate bytes for a particular group. */
     class Stats(val count: Int, val numBytes: Int) extends Serializable {
       def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
index 31c6d10..966478f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
@@ -24,7 +24,8 @@ import java.util.Random
 object SkewedGroupByTest {
   def main(args: Array[String]) {
     if (args.length == 0) {
-      System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+      System.err.println(
+        "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
       System.exit(1)
     }  
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 3981906..cf1fc3e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -56,7 +56,8 @@ object SparkHdfsLR {
     val sc = new SparkContext(args(0), "SparkHdfsLR",
       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), 
       InputFormatInfo.computePreferredLocations(
-          Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
+        Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))
+      ))
     val lines = sc.textFile(inputPath)
     val points = lines.map(parsePoint _).cache()
     val ITERATIONS = args(2).toInt

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index cfafbaf..b97cb8f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -43,16 +43,18 @@ class PageRankUtils extends Serializable {
     val terminate = superstep >= 10
 
     val outbox: Array[PRMessage] =
-      if (!terminate)
-        self.outEdges.map(targetId =>
-          new PRMessage(targetId, newValue / self.outEdges.size))
-      else
+      if (!terminate) {
+        self.outEdges.map(targetId => new PRMessage(targetId, newValue / self.outEdges.size))
+      } else {
         Array[PRMessage]()
+      }
 
     (new PRVertex(newValue, self.outEdges, !terminate), outbox)
   }
 
-  def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) =
+  def computeNoCombiner(numVertices: Long, epsilon: Double)
+    (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int)
+  : (PRVertex, Array[PRMessage]) =
     computeWithCombiner(numVertices, epsilon)(self, messages match {
       case Some(msgs) => Some(msgs.map(_.value).sum)
       case None => None
@@ -81,7 +83,8 @@ class PRVertex() extends Vertex with Serializable {
   }
 
   override def toString(): String = {
-    "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString)
+    "PRVertex(value=%f, outEdges.length=%d, active=%s)"
+      .format(value, outEdges.length, active.toString)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
index 4c0de46..25bd55c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
@@ -33,7 +33,8 @@ import scala.xml.{XML,NodeSeq}
 object WikipediaPageRank {
   def main(args: Array[String]) {
     if (args.length < 5) {
-      System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
+      System.err.println(
+        "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
       System.exit(-1)
     }
     val sparkConf = new SparkConf()
@@ -61,24 +62,26 @@ object WikipediaPageRank {
       val fields = line.split("\t")
       val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
       val links =
-        if (body == "\\N")
+        if (body == "\\N") {
           NodeSeq.Empty
-        else
+        } else {
           try {
             XML.loadString(body) \\ "link" \ "target"
           } catch {
             case e: org.xml.sax.SAXParseException =>
-              System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+              System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
             NodeSeq.Empty
           }
+        }
       val outEdges = links.map(link => new String(link.text)).toArray
       val id = new String(title)
       (id, new PRVertex(1.0 / numVertices, outEdges))
     })
-    if (usePartitioner)
+    if (usePartitioner) {
       vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
-    else
+    } else {
       vertices = vertices.cache
+    }
     println("Done parsing input file.")
 
     // Do the computation
@@ -92,7 +95,7 @@ object WikipediaPageRank {
           utils.computeWithCombiner(numVertices, epsilon))
 
     // Print the result
-    System.err.println("Articles with PageRank >= "+threshold+":")
+    System.err.println("Articles with PageRank >= " + threshold + ":")
     val top =
       (result
        .filter { case (id, vertex) => vertex.value >= threshold }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 2cf273a..27afa6b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -31,7 +31,8 @@ import org.apache.spark.rdd.RDD
 object WikipediaPageRankStandalone {
   def main(args: Array[String]) {
     if (args.length < 5) {
-      System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
+      System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " +
+        "<numIterations> <host> <usePartitioner>")
       System.exit(-1)
     }
     val sparkConf = new SparkConf()
@@ -51,10 +52,11 @@ object WikipediaPageRankStandalone {
     val input = sc.textFile(inputFile)
     val partitioner = new HashPartitioner(sc.defaultParallelism)
     val links =
-      if (usePartitioner)
+      if (usePartitioner) {
         input.map(parseArticle _).partitionBy(partitioner).cache()
-      else
+      } else {
         input.map(parseArticle _).cache()
+      }
     val n = links.count()
     val defaultRank = 1.0 / n
     val a = 0.15
@@ -62,10 +64,11 @@ object WikipediaPageRankStandalone {
     // Do the computation
     val startTime = System.currentTimeMillis
     val ranks =
-        pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism)
+      pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner,
+        sc.defaultParallelism)
 
     // Print the result
-    System.err.println("Articles with PageRank >= "+threshold+":")
+    System.err.println("Articles with PageRank >= " + threshold + ":")
     val top =
       (ranks
        .filter { case (id, rank) => rank >= threshold }
@@ -75,7 +78,7 @@ object WikipediaPageRankStandalone {
 
     val time = (System.currentTimeMillis - startTime) / 1000.0
     println("Completed %d iterations in %f seconds: %f seconds per iteration"
-            .format(numIterations, time, time / numIterations))
+      .format(numIterations, time, time / numIterations))
     System.exit(0)
   }
 
@@ -84,16 +87,17 @@ object WikipediaPageRankStandalone {
     val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
     val id = new String(title)
     val links =
-      if (body == "\\N")
+      if (body == "\\N") {
         NodeSeq.Empty
-      else
+      } else {
         try {
           XML.loadString(body) \\ "link" \ "target"
         } catch {
           case e: org.xml.sax.SAXParseException =>
-            System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
+            System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body)
           NodeSeq.Empty
         }
+      }
     val outEdges = links.map(link => new String(link.text)).toArray
     (id, outEdges)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index bc0d163..3d7b390 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -132,7 +132,7 @@ object FeederActor {
  * To run this example locally, you may run Feeder Actor as
  *    `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999`
  * and then run the example
- *    `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ *    `./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
  */
 object ActorWordCount {
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index d9cb732..6bccd1d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.kafka._
 import org.apache.spark.streaming.util.RawTextHelper._
 
+// scalastyle:off
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
  * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
@@ -38,6 +39,7 @@ import org.apache.spark.streaming.util.RawTextHelper._
  * Example:
  *    `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
  */
+// scalastyle:on
 object KafkaWordCount {
   def main(args: Array[String]) {
     if (args.length < 5) {
@@ -56,7 +58,7 @@ object KafkaWordCount {
     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
     val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1l))
+    val wordCounts = words.map(x => (x, 1L))
       .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
     wordCounts.print()
     

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index eb61caf..0a68ac8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -64,6 +64,7 @@ object MQTTPublisher {
   }
 }
 
+// scalastyle:off
 /**
  * A sample wordcount with MqttStream stream
  *
@@ -71,7 +72,8 @@ object MQTTPublisher {
  * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
  * In ubuntu mosquitto can be installed using the command  `$ sudo apt-get install mosquitto`
  * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
- * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient
+ * Example Java code for Mqtt Publisher and Subscriber can be found here
+ * https://bitbucket.org/mkjinesh/mqttclient
  * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
  * In local mode, <master> should be 'local[n]' with n > 1
  *   <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
@@ -81,6 +83,7 @@ object MQTTPublisher {
  * and run the example as
  *    `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
  */
+// scalastyle:on
 object MQTTWordCount {
 
   def main(args: Array[String]) {
@@ -93,7 +96,7 @@ object MQTTWordCount {
 
     val Seq(master, brokerUrl, topic) = args.toSeq
 
-    val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), 
+    val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
     StreamingContext.jarOfClass(this.getClass))
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 5656d48..d4c4d86 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -21,6 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 
+// scalastyle:off
 /**
  * Counts words in text encoded with UTF8 received from the network every second.
  *
@@ -33,6 +34,7 @@ import org.apache.spark.storage.StorageLevel
  * and then run the example
  *    `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
  */
+// scalastyle:on
 object NetworkWordCount {
   def main(args: Array[String]) {
     if (args.length < 3) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index aa82bf3..56d10a9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -30,8 +30,8 @@ import java.nio.charset.Charset
  *
  * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
  *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- *   <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
  *   <output-file> file to which the word counts will be appended
  *
  * In local mode, <master> should be 'local[n]' with n > 1
@@ -54,11 +54,13 @@ import java.nio.charset.Charset
  *
  * To run this example in a local standalone cluster with automatic driver recovery,
  *
- *      `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \
+ *      `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
+ *              <path-to-examples-jar> \
  *              org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \
  *              localhost 9999 ~/checkpoint ~/out`
  *
- * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ * <path-to-examples-jar> would typically be
+ * <spark-dir>/examples/target/scala-XX/spark-examples....jar
  *
  * Refer to the online documentation for more details.
  */
@@ -96,11 +98,12 @@ object RecoverableNetworkWordCount {
       System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
       System.err.println(
         """
-          |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
-          |     <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
-          |     <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
-          |     <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
-          |     <output-file> file to which the word counts will be appended
+          |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory>
+          |     <output-file> <master> is the Spark master URL. In local mode, <master> should be
+          |     'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark
+          |     Streaming would connect to receive data. <checkpoint-directory> directory to
+          |     HDFS-compatible file system which checkpoint data <output-file> file to which the
+          |     word counts will be appended
           |
           |In local mode, <master> should be 'local[n]' with n > 1
           |Both <checkpoint-directory> and <output-file> must be absolute paths

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index bbd4494..8a654f8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -24,7 +24,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.twitter._
-
+// scalastyle:off
 /**
  * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
  * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
@@ -34,15 +34,19 @@ import org.apache.spark.streaming.twitter._
  *   the same approach could be used for computing popular topics for example.
  * <p>
  * <p>
- *   <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
- *   for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
- *   that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
- *   estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
- *   percentage of the overall total count.
+ *   <a href=
+ *   "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
+ *   structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
+ *   of any given element, etc), that uses space sub-linear in the number of elements in the
+ *   stream. Once elements are added to the CMS, the estimated count of an element can be computed,
+ *   as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
+ *   count.
  * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the reduce operation.
+ *   Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
+ *   reduce operation.
  */
+// scalastyle:on
 object TwitterAlgebirdCMS {
   def main(args: Array[String]) {
     if (args.length < 1) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c6215fd..45771d7 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -29,8 +29,7 @@ import org.apache.spark.streaming.twitter._
  * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
  * <p>
  * <p>
- *   This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data
- * -mining/">
+ *   This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
  *   blog post</a> and this
  *   <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
  *     blog post</a>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 85b4ce5..35be7ff 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -53,11 +53,13 @@ object SimpleZeroMQPublisher {
   }
 }
 
+// scalastyle:off
 /**
  * A sample wordcount with ZeroMQStream stream
  *
  * To work with zeroMQ, some native libraries have to be installed.
- * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
+ * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
+ * (http://www.zeromq.org/intro:get-the-software)
  * 
  * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>
  * In local mode, <master> should be 'local[n]' with n > 1
@@ -68,6 +70,7 @@ object SimpleZeroMQPublisher {
  * and run the example as
  *    `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
  */
+// scalastyle:on
 object ZeroMQWordCount {
   def main(args: Array[String]) {
     if (args.length < 3) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 799a9dd..f2296a8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -67,8 +67,7 @@ class EdgeRDD[@specialized ED: ClassTag](
   }
 
   private[graphx] def mapEdgePartitions[ED2: ClassTag](
-    f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
-    : EdgeRDD[ED2] = {
+      f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
     new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
       val (pid, ep) = iter.next()
       Iterator(Tuple2(pid, f(pid, ep)))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/project/project/SparkPluginBuild.scala
----------------------------------------------------------------------
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index 0392a60..a88a5e1 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -20,5 +20,7 @@ import sbt._
 object SparkPluginDef extends Build {
   lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
   /* This is not published in a Maven repository, so we get it from GitHub directly */
-  lazy val junitXmlListener = uri("git://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016")
+  lazy val junitXmlListener = uri(
+    "https://github.com/chenkelmann/junit_xml_listener.git#3f8029fbfda54dc7a68b1afd2f885935e1090016"
+  )
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7527232..ee968c5 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -1,4 +1,21 @@
-<!-- If you wish to turn off checking for a section of code, you can put a comment in the source before and after the section, with the following syntax: -->
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!-- If you wish to turn off checking for a section of code, you can put a comment in the source
+ before and after the section, with the following syntax: -->
 <!-- // scalastyle:off -->
 <!-- ... -->
 <!-- // naughty stuff -->

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 4d778dc..baf80fe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -128,7 +128,8 @@ class CheckpointWriter(
       while (attempts < MAX_ATTEMPTS && !stopped) {
         attempts += 1
         try {
-          logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'")
+          logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
+            + "'")
 
           // Write checkpoint to temp file
           fs.delete(tempFile, true)   // just in case it exists
@@ -167,11 +168,13 @@ class CheckpointWriter(
           return
         } catch {
           case ioe: IOException =>
-            logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe)
+            logWarning("Error in attempt " + attempts + " of writing checkpoint to "
+              + checkpointFile, ioe)
             reset()
         }
       }
-      logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'")
+      logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
+        + checkpointFile + "'")
     }
   }
 
@@ -220,7 +223,8 @@ class CheckpointWriter(
 private[streaming]
 object CheckpointReader extends Logging {
 
-  def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
+  def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
+  {
     val checkpointPath = new Path(checkpointDir)
     def fs = checkpointPath.getFileSystem(hadoopConf)
     

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 0683113..fde4670 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -153,7 +153,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   def validate() {
     this.synchronized {
       assert(batchDuration != null, "Batch duration has not been set")
-      //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
+      //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
+      // " is very low")
       assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index 04c994c..16479a0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -33,7 +33,8 @@ class Interval(val beginTime: Time, val endTime: Time) {
 
   def < (that: Interval): Boolean = {
     if (this.duration != that.duration) {
-      throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
+      throw new Exception("Comparing two intervals with different durations [" + this + ", "
+        + that + "]")
     }
     this.endTime < that.endTime
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 64fe204..7aa7ead 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -78,8 +78,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD has a single element generated by counting the number
-   * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
-   * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+   * of elements in a window over this DStream. windowDuration and slideDuration are as defined in
+   * the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
    */
   def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
     dstream.countByWindow(windowDuration, slideDuration)
@@ -87,8 +87,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD contains the count of distinct elements in
-   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with
-   * Spark's default number of partitions.
+   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
+   * with Spark's default number of partitions.
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
@@ -103,8 +103,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   /**
    * Return a new DStream in which each RDD contains the count of distinct elements in
-   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
-   * partitions.
+   * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs
+   * with `numPartitions` partitions.
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which