You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/27 02:25:00 UTC

git commit: Spark 1095 : Adding explicit return types to all public methods

Repository: spark
Updated Branches:
  refs/heads/master be6d96c15 -> 3e63d98f0


Spark 1095 : Adding explicit return types to all public methods

Excluded those that are self-evident and the cases that are discussed in the mailing list.

Author: NirmalReddy <ni...@yahoo.com>
Author: NirmalReddy <ni...@imaginea.com>

Closes #168 from NirmalReddy/Spark-1095 and squashes the following commits:

ac54b29 [NirmalReddy] import misplaced
8c5ff3e [NirmalReddy] Changed syntax of unit returning methods
02d0778 [NirmalReddy] fixed explicit types in all the other packages
1c17773 [NirmalReddy] fixed explicit types in core package


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e63d98f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e63d98f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e63d98f

Branch: refs/heads/master
Commit: 3e63d98f09065386901d78c141b0da93cdce0f76
Parents: be6d96c
Author: NirmalReddy <ni...@yahoo.com>
Authored: Wed Mar 26 18:24:55 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Mar 26 18:24:55 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 31 ++++++++++++--------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 15 +++++++---
 .../spark/api/java/JavaSparkContext.scala       |  2 +-
 .../apache/spark/deploy/ClientArguments.scala   |  2 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  2 +-
 .../master/ZooKeeperPersistenceEngine.scala     |  3 +-
 .../apache/spark/metrics/sink/ConsoleSink.scala |  2 +-
 .../org/apache/spark/metrics/sink/CsvSink.scala |  2 +-
 .../spark/metrics/sink/GraphiteSink.scala       |  4 +--
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  2 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  2 +-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    |  2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  2 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  2 +-
 .../org/apache/spark/storage/StorageLevel.scala |  7 +++--
 .../org/apache/spark/util/Distribution.scala    |  7 +++--
 .../apache/spark/metrics/sink/GangliaSink.scala | 12 ++++----
 .../scala/org/apache/spark/graphx/Graph.scala   |  3 +-
 .../apache/spark/graphx/impl/GraphImpl.scala    |  2 +-
 .../org/apache/spark/graphx/lib/Analytics.scala |  2 +-
 .../spark/streaming/StreamingContext.scala      |  4 +--
 .../streaming/api/java/JavaDStreamLike.scala    |  6 ++--
 .../api/java/JavaStreamingContext.scala         | 22 ++++++++++----
 .../spark/streaming/dstream/DStream.scala       |  8 +++--
 .../spark/streaming/scheduler/BatchInfo.scala   |  8 +++--
 25 files changed, 97 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4dd2981..b23accb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
 import org.apache.mesos.MesosNativeLibrary
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
@@ -230,7 +231,7 @@ class SparkContext(
   postEnvironmentUpdate()
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
-  val hadoopConfiguration = {
+  val hadoopConfiguration: Configuration = {
     val env = SparkEnv.get
     val hadoopConf = SparkHadoopUtil.get.newConfiguration()
     // Explicitly check for S3 environment variables
@@ -630,7 +631,7 @@ class SparkContext(
    * standard mutable collections. So you can use this with mutable Map, Set, etc.
    */
   def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
-      (initialValue: R) = {
+      (initialValue: R): Accumulable[R, T] = {
     val param = new GrowableAccumulableParam[R,T]
     new Accumulable(initialValue, param)
   }
@@ -640,7 +641,7 @@ class SparkContext(
    * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
    * The variable will be sent to each cluster only once.
    */
-  def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
+  def broadcast[T](value: T): Broadcast[T] = env.broadcastManager.newBroadcast[T](value, isLocal)
 
   /**
    * Add a file to be downloaded with this Spark job on every node.
@@ -1126,7 +1127,7 @@ object SparkContext extends Logging {
   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
 
   implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
-      rdd: RDD[(K, V)])   =
+      rdd: RDD[(K, V)]) =
     new SequenceFileRDDFunctions(rdd)
 
   implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
@@ -1163,27 +1164,33 @@ object SparkContext extends Logging {
   }
 
   // Helper objects for converting common types to Writable
-  private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
+  private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
+      : WritableConverter[T] = {
     val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
     new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
   }
 
-  implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
+  implicit def intWritableConverter(): WritableConverter[Int] =
+    simpleWritableConverter[Int, IntWritable](_.get)
 
-  implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
+  implicit def longWritableConverter(): WritableConverter[Long] =
+    simpleWritableConverter[Long, LongWritable](_.get)
 
-  implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
+  implicit def doubleWritableConverter(): WritableConverter[Double] =
+    simpleWritableConverter[Double, DoubleWritable](_.get)
 
-  implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
+  implicit def floatWritableConverter(): WritableConverter[Float] =
+    simpleWritableConverter[Float, FloatWritable](_.get)
 
-  implicit def booleanWritableConverter() =
+  implicit def booleanWritableConverter(): WritableConverter[Boolean] =
     simpleWritableConverter[Boolean, BooleanWritable](_.get)
 
-  implicit def bytesWritableConverter() = {
+  implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
     simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
   }
 
-  implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
+  implicit def stringWritableConverter(): WritableConverter[String] =
+    simpleWritableConverter[String, Text](_.toString)
 
   implicit def writableWritableConverter[T <: Writable]() =
     new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ddac553..e03b8e7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -391,19 +391,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   /**
    * Save this RDD as a text file, using string representations of elements.
    */
-  def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
+  def saveAsTextFile(path: String): Unit = {
+    rdd.saveAsTextFile(path)
+  }
 
 
   /**
    * Save this RDD as a compressed text file, using string representations of elements.
    */
-  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
+  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
     rdd.saveAsTextFile(path, codec)
+  }
 
   /**
    * Save this RDD as a SequenceFile of serialized objects.
    */
-  def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
+  def saveAsObjectFile(path: String): Unit = {
+    rdd.saveAsObjectFile(path)
+  }
 
   /**
    * Creates tuples of the elements in this RDD by applying `f`.
@@ -420,7 +425,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * executed on this RDD. It is strongly recommended that this RDD is persisted in
    * memory, otherwise saving it on a file will require recomputation.
    */
-  def checkpoint() = rdd.checkpoint()
+  def checkpoint(): Unit = {
+    rdd.checkpoint()
+  }
 
   /**
    * Return whether this RDD has been checkpointed or not

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 35508b6..e531a57 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -463,7 +463,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
     sc.setCheckpointDir(dir)
   }
 
-  def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
+  def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir)
 
   protected def checkpointFile[T](path: String): JavaRDD[T] = {
     implicit val ctag: ClassTag[T] = fakeClassTag

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 00f5cd5..c07838f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -112,5 +112,5 @@ private[spark] class ClientArguments(args: Array[String]) {
 }
 
 object ClientArguments {
-  def isValidJarUrl(s: String) = s.matches("(.+):(.+)jar")
+  def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index d2d8d6d..9bdbfb3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
  * Contains util methods to interact with Hadoop from Spark.
  */
 class SparkHadoopUtil {
-  val conf = newConfiguration()
+  val conf: Configuration = newConfiguration()
   UserGroupInformation.setConfiguration(conf)
 
   def runAsUser(user: String)(func: () => Unit) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 5413ff6..834dfed 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
 import scala.collection.JavaConversions._
 
 import akka.serialization.Serialization
+import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
 import org.apache.spark.{Logging, SparkConf}
@@ -29,7 +30,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   with Logging
 {
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
-  val zk = SparkCuratorUtil.newClient(conf)
+  val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
 
   SparkCuratorUtil.mkdir(zk, WORKING_DIR)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
index 4d2ffc5..64eac73 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
@@ -38,7 +38,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry,
     case None => CONSOLE_DEFAULT_PERIOD
   }
 
-  val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
+  val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
     case Some(s) => TimeUnit.valueOf(s.toUpperCase())
     case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
index 319f408..544848d 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -41,7 +41,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry,
     case None => CSV_DEFAULT_PERIOD
   }
 
-  val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
+  val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
     case Some(s) => TimeUnit.valueOf(s.toUpperCase())
     case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index 0ffdf38..7f0a2fd 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -39,7 +39,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
   val GRAPHITE_KEY_UNIT = "unit"
   val GRAPHITE_KEY_PREFIX = "prefix"
 
-  def propertyToOption(prop: String) = Option(property.getProperty(prop))
+  def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
 
   if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
     throw new Exception("Graphite sink requires 'host' property.")
@@ -57,7 +57,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
     case None => GRAPHITE_DEFAULT_PERIOD
   }
 
-  val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
+  val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
     case Some(s) => TimeUnit.valueOf(s.toUpperCase())
     case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 8561711..9aa454a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -103,7 +103,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
     array
   }
 
-  override val partitioner = Some(part)
+  override val partitioner: Some[Partitioner] = Some(part)
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
     val sparkConf = SparkEnv.get.conf

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 932ff5b..3af008b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -171,7 +171,7 @@ class HadoopRDD[K, V](
     array
   }
 
-  override def compute(theSplit: Partition, context: TaskContext) = {
+  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
     val iter = new NextIterator[(K, V)] {
 
       val split = theSplit.asInstanceOf[HadoopPartition]

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 8df8718..1b50374 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -116,7 +116,7 @@ class JdbcRDD[T: ClassTag](
 }
 
 object JdbcRDD {
-  def resultSetToObjectArray(rs: ResultSet) = {
+  def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
     Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index d1fff29..461a749 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -80,7 +80,7 @@ class NewHadoopRDD[K, V](
     result
   }
 
-  override def compute(theSplit: Partition, context: TaskContext) = {
+  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
     val iter = new Iterator[(K, V)] {
       val split = theSplit.asInstanceOf[NewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6af4224..ce2b8ac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -121,7 +121,7 @@ abstract class RDD[T: ClassTag](
   @transient var name: String = null
 
   /** Assign a name to this RDD */
-  def setName(_name: String) = {
+  def setName(_name: String): RDD[T] = {
     name = _name
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 1b7934d..4212a53 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -126,15 +126,16 @@ object StorageLevel {
   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
 
   /** Create a new StorageLevel object */
-  def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1) =
+  def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean,
+      replication: Int = 1): StorageLevel =
     getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication))
 
   /** Create a new StorageLevel object from its integer representation */
-  def apply(flags: Int, replication: Int) =
+  def apply(flags: Int, replication: Int): StorageLevel =
     getCachedStorageLevel(new StorageLevel(flags, replication))
 
   /** Read StorageLevel object from ObjectInput stream */
-  def apply(in: ObjectInput) = {
+  def apply(in: ObjectInput): StorageLevel = {
     val obj = new StorageLevel()
     obj.readExternal(in)
     getCachedStorageLevel(obj)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/core/src/main/scala/org/apache/spark/util/Distribution.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
index ab738c4..5b34755 100644
--- a/core/src/main/scala/org/apache/spark/util/Distribution.scala
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util
 
 import java.io.PrintStream
 
+import scala.collection.immutable.IndexedSeq
+
 /**
  * Util for getting some stats from a small sample of numeric values, with some handy
  * summary functions.
@@ -40,7 +42,8 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
    * given from 0 to 1
    * @param probabilities
    */
-  def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = {
+  def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
+      : IndexedSeq[Double] = {
     probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
   }
 
@@ -48,7 +51,7 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
     math.min((p * length).toInt + startIdx, endIdx - 1)
   }
 
-  def showQuantiles(out: PrintStream = System.out) = {
+  def showQuantiles(out: PrintStream = System.out): Unit = {
     out.println("min\t25%\t50%\t75%\tmax")
     getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
     out.println

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
----------------------------------------------------------------------
diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
index cd37317..d03d777 100644
--- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import com.codahale.metrics.MetricRegistry
 import com.codahale.metrics.ganglia.GangliaReporter
 import info.ganglia.gmetric4j.gmetric.GMetric
+import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
 
 import org.apache.spark.SecurityManager
 import org.apache.spark.metrics.MetricsSystem
@@ -33,10 +34,10 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
   val GANGLIA_DEFAULT_PERIOD = 10
 
   val GANGLIA_KEY_UNIT = "unit"
-  val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
+  val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
 
   val GANGLIA_KEY_MODE = "mode"
-  val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
+  val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
 
   // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
   val GANGLIA_KEY_TTL = "ttl"
@@ -45,7 +46,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
   val GANGLIA_KEY_HOST = "host"
   val GANGLIA_KEY_PORT = "port"
 
-  def propertyToOption(prop: String) = Option(property.getProperty(prop))
+  def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
 
   if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
     throw new Exception("Ganglia sink requires 'host' property.")
@@ -58,11 +59,12 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
   val host = propertyToOption(GANGLIA_KEY_HOST).get
   val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
   val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
-  val mode = propertyToOption(GANGLIA_KEY_MODE)
+  val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
     .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
   val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
     .getOrElse(GANGLIA_DEFAULT_PERIOD)
-  val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
+  val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
+    .map(u => TimeUnit.valueOf(u.toUpperCase))
     .getOrElse(GANGLIA_DEFAULT_UNIT)
 
   MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 65a1a8c..ef05623 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -419,5 +419,6 @@ object Graph {
    * All the convenience operations are defined in the [[GraphOps]] class which may be
    * shared across multiple graph implementations.
    */
-  implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
+  implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
+      (g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops
 } // end of Graph object

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 5e9be18..43ac11d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -197,7 +197,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   override def mapReduceTriplets[A: ClassTag](
       mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       reduceFunc: (A, A) => A,
-      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
+      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
 
     ClosureCleaner.clean(mapFunc)
     ClosureCleaner.clean(reduceFunc)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 24699df..fa533a5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -26,7 +26,7 @@ import org.apache.spark.graphx.PartitionStrategy._
  */
 object Analytics extends Logging {
 
-  def main(args: Array[String]) = {
+  def main(args: Array[String]): Unit = {
     val host = args(0)
     val taskType = args(1)
     val fname = args(2)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 062b888..e198c69 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -431,7 +431,7 @@ class StreamingContext private[streaming] (
    * Stop the execution of the streams.
    * @param stopSparkContext Stop the associated SparkContext or not
    */
-  def stop(stopSparkContext: Boolean = true) = synchronized {
+  def stop(stopSparkContext: Boolean = true): Unit = synchronized {
     scheduler.stop()
     logInfo("StreamingContext stopped successfully")
     waiter.notifyStop()
@@ -489,7 +489,7 @@ object StreamingContext extends Logging {
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
    */
-  def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
+  def jarOfClass(cls: Class[_]): Seq[String] = SparkContext.jarOfClass(cls)
 
   private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
     // Set the default cleaner delay to an hour if not already set.

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a85cd04..bb2f492 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -49,7 +49,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Print the first ten elements of each RDD generated in this DStream. This is an output
    * operator, so this DStream will be registered as an output stream and there materialized.
    */
-  def print() = dstream.print()
+  def print(): Unit = {
+    dstream.print()
+  }
 
   /**
    * Return a new DStream in which each RDD has a single element generated by counting each RDD
@@ -401,7 +403,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Enable periodic checkpointing of RDDs of this DStream.
    * @param interval Time interval after which generated RDD will be checkpointed
    */
-  def checkpoint(interval: Duration) = {
+  def checkpoint(interval: Duration): DStream[T] = {
     dstream.checkpoint(interval)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index c48d754..b705d2e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -477,31 +477,41 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   /**
    * Start the execution of the streams.
    */
-  def start() = ssc.start()
+  def start(): Unit = {
+    ssc.start()
+  }
 
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
    */
-  def awaitTermination() = ssc.awaitTermination()
+  def awaitTermination(): Unit = {
+    ssc.awaitTermination()
+  }
 
   /**
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
    * @param timeout time to wait in milliseconds
    */
-  def awaitTermination(timeout: Long) = ssc.awaitTermination(timeout)
+  def awaitTermination(timeout: Long): Unit = {
+    ssc.awaitTermination(timeout)
+  }
 
   /**
    * Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
    */
-  def stop() = ssc.stop()
+  def stop(): Unit = {
+    ssc.stop()
+  }
 
   /**
    * Stop the execution of the streams.
    * @param stopSparkContext Stop the associated SparkContext or not
    */
-  def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
+  def stop(stopSparkContext: Boolean): Unit = {
+    ssc.stop(stopSparkContext)
+  }
 }
 
 /**
@@ -579,7 +589,7 @@ object JavaStreamingContext {
    * Find the JAR from which a given class was loaded, to make it easy for users to pass
    * their JARs to StreamingContext.
    */
-  def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
+  def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 6bff56a..d48b51a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -503,14 +503,18 @@ abstract class DStream[T: ClassTag] (
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   @deprecated("use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
+  def foreach(foreachFunc: RDD[T] => Unit): Unit = {
+    this.foreachRDD(foreachFunc)
+  }
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   @deprecated("use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
+  def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = {
+    this.foreachRDD(foreachFunc)
+  }
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so

http://git-wip-us.apache.org/repos/asf/spark/blob/3e63d98f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 4e8d07f..7f3cd2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -39,17 +39,19 @@ case class BatchInfo(
    * was submitted to the streaming scheduler. Essentially, it is
    * `processingStartTime` - `submissionTime`.
    */
-  def schedulingDelay = processingStartTime.map(_ - submissionTime)
+  def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
 
   /**
    * Time taken for the all jobs of this batch to finish processing from the time they started
    * processing. Essentially, it is `processingEndTime` - `processingStartTime`.
    */
-  def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
+  def processingDelay: Option[Long] = processingEndTime.zip(processingStartTime)
+    .map(x => x._1 - x._2).headOption
 
   /**
    * Time taken for all the jobs of this batch to finish processing from the time they
    * were submitted.  Essentially, it is `processingDelay` + `schedulingDelay`.
    */
-  def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
+  def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
+    .map(x => x._1 + x._2).headOption
 }