You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/27 09:57:30 UTC

git commit: Minor cleanup to tighten visibility and remove compilation warning.

Repository: spark
Updated Branches:
  refs/heads/master 2d972fd84 -> 436a7730b


Minor cleanup to tighten visibility and remove compilation warning.

Author: Reynold Xin <rx...@apache.org>

Closes #2555 from rxin/cleanup and squashes the following commits:

6add199 [Reynold Xin] Minor cleanup to tighten visibility and remove compilation warning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/436a7730
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/436a7730
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/436a7730

Branch: refs/heads/master
Commit: 436a7730b6e7067f74b3739a3a412490003f7c4c
Parents: 2d972fd
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Sep 27 00:57:26 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Sep 27 00:57:26 2014 -0700

----------------------------------------------------------------------
 .../spark/input/WholeTextFileRecordReader.scala | 24 +++++-----
 .../apache/spark/metrics/MetricsSystem.scala    | 28 +++++++-----
 .../spark/metrics/MetricsSystemSuite.scala      | 33 ++++++++------
 .../spark/streaming/StreamingContextSuite.scala | 47 ++++++++++----------
 4 files changed, 70 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/436a7730/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index c3dabd2..3564ab2 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -36,33 +36,31 @@ private[spark] class WholeTextFileRecordReader(
     index: Integer)
   extends RecordReader[String, String] {
 
-  private val path = split.getPath(index)
-  private val fs = path.getFileSystem(context.getConfiguration)
+  private[this] val path = split.getPath(index)
+  private[this] val fs = path.getFileSystem(context.getConfiguration)
 
   // True means the current file has been processed, then skip it.
-  private var processed = false
+  private[this] var processed = false
 
-  private val key = path.toString
-  private var value: String = null
+  private[this] val key = path.toString
+  private[this] var value: String = null
 
-  override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
+  override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
 
-  override def close() = {}
+  override def close(): Unit = {}
 
-  override def getProgress = if (processed) 1.0f else 0.0f
+  override def getProgress: Float = if (processed) 1.0f else 0.0f
 
-  override def getCurrentKey = key
+  override def getCurrentKey: String = key
 
-  override def getCurrentValue = value
+  override def getCurrentValue: String = value
 
-  override def nextKeyValue = {
+  override def nextKeyValue(): Boolean = {
     if (!processed) {
       val fileIn = fs.open(path)
       val innerBuffer = ByteStreams.toByteArray(fileIn)
-
       value = new Text(innerBuffer).toString
       Closeables.close(fileIn, false)
-
       processed = true
       true
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/436a7730/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 6ef817d..fd316a8 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -63,15 +63,18 @@ import org.apache.spark.metrics.source.Source
  *
  * [options] is the specific property of this source or sink.
  */
-private[spark] class MetricsSystem private (val instance: String,
-    conf: SparkConf, securityMgr: SecurityManager) extends Logging {
+private[spark] class MetricsSystem private (
+    val instance: String,
+    conf: SparkConf,
+    securityMgr: SecurityManager)
+  extends Logging {
 
-  val confFile = conf.get("spark.metrics.conf", null)
-  val metricsConfig = new MetricsConfig(Option(confFile))
+  private[this] val confFile = conf.get("spark.metrics.conf", null)
+  private[this] val metricsConfig = new MetricsConfig(Option(confFile))
 
-  val sinks = new mutable.ArrayBuffer[Sink]
-  val sources = new mutable.ArrayBuffer[Source]
-  val registry = new MetricRegistry()
+  private val sinks = new mutable.ArrayBuffer[Sink]
+  private val sources = new mutable.ArrayBuffer[Source]
+  private val registry = new MetricRegistry()
 
   // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
   private var metricsServlet: Option[MetricsServlet] = None
@@ -91,7 +94,7 @@ private[spark] class MetricsSystem private (val instance: String,
     sinks.foreach(_.stop)
   }
 
-  def report(): Unit = {
+  def report() {
     sinks.foreach(_.report())
   }
 
@@ -155,8 +158,8 @@ private[spark] object MetricsSystem {
   val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
   val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
 
-  val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
-  val MINIMAL_POLL_PERIOD = 1
+  private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+  private[this] val MINIMAL_POLL_PERIOD = 1
 
   def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
     val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
@@ -166,7 +169,8 @@ private[spark] object MetricsSystem {
     }
   }
 
-  def createMetricsSystem(instance: String, conf: SparkConf,
-      securityMgr: SecurityManager): MetricsSystem =
+  def createMetricsSystem(
+      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
     new MetricsSystem(instance, conf, securityMgr)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/436a7730/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 96a5a12..e42b181 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -17,42 +17,47 @@
 
 package org.apache.spark.metrics
 
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.metrics.source.Source
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
+
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.master.MasterSource
 
-class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
+import scala.collection.mutable.ArrayBuffer
+
+
+class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{
   var filePath: String = _
   var conf: SparkConf = null
   var securityMgr: SecurityManager = null
 
   before {
-    filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
+    filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile
     conf = new SparkConf(false).set("spark.metrics.conf", filePath)
     securityMgr = new SecurityManager(conf)
   }
 
   test("MetricsSystem with default config") {
     val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
-    val sources = metricsSystem.sources
-    val sinks = metricsSystem.sinks
+    val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
+    val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
 
-    assert(sources.length === 0)
-    assert(sinks.length === 0)
-    assert(!metricsSystem.getServletHandlers.isEmpty)
+    assert(metricsSystem.invokePrivate(sources()).length === 0)
+    assert(metricsSystem.invokePrivate(sinks()).length === 0)
+    assert(metricsSystem.getServletHandlers.nonEmpty)
   }
 
   test("MetricsSystem with sources add") {
     val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
-    val sources = metricsSystem.sources
-    val sinks = metricsSystem.sinks
+    val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
+    val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
 
-    assert(sources.length === 0)
-    assert(sinks.length === 1)
-    assert(!metricsSystem.getServletHandlers.isEmpty)
+    assert(metricsSystem.invokePrivate(sources()).length === 0)
+    assert(metricsSystem.invokePrivate(sinks()).length === 1)
+    assert(metricsSystem.getServletHandlers.nonEmpty)
 
     val source = new MasterSource(null)
     metricsSystem.registerSource(source)
-    assert(sources.length === 1)
+    assert(metricsSystem.invokePrivate(sources()).length === 1)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/436a7730/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ebf8374..655cec1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -19,18 +19,18 @@ package org.apache.spark.streaming
 
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.language.postfixOps
+import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.util.Utils
-import org.scalatest.{Assertions, BeforeAndAfter, FunSuite}
-import org.scalatest.concurrent.Timeouts
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.SpanSugar._
+
 
 class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts with Logging {
 
@@ -68,7 +68,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   test("from no conf + spark home + env") {
     ssc = new StreamingContext(master, appName, batchDuration,
       sparkHome, Nil, Map(envPair))
-    assert(ssc.conf.getExecutorEnv.exists(_ == envPair))
+    assert(ssc.conf.getExecutorEnv.contains(envPair))
   }
 
   test("from conf with settings") {
@@ -94,7 +94,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
     myConf.set("spark.cleaner.ttl", "10")
     val ssc1 = new StreamingContext(myConf, batchDuration)
-    addInputStream(ssc1).register
+    addInputStream(ssc1).register()
     ssc1.start()
     val cp = new Checkpoint(ssc1, Time(1000))
     assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
@@ -107,7 +107,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("start and stop state check") {
     ssc = new StreamingContext(master, appName, batchDuration)
-    addInputStream(ssc).register
+    addInputStream(ssc).register()
 
     assert(ssc.state === ssc.StreamingContextState.Initialized)
     ssc.start()
@@ -118,7 +118,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("start multiple times") {
     ssc = new StreamingContext(master, appName, batchDuration)
-    addInputStream(ssc).register
+    addInputStream(ssc).register()
     ssc.start()
     intercept[SparkException] {
       ssc.start()
@@ -127,7 +127,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("stop multiple times") {
     ssc = new StreamingContext(master, appName, batchDuration)
-    addInputStream(ssc).register
+    addInputStream(ssc).register()
     ssc.start()
     ssc.stop()
     ssc.stop()
@@ -135,7 +135,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("stop before start and start after stop") {
     ssc = new StreamingContext(master, appName, batchDuration)
-    addInputStream(ssc).register
+    addInputStream(ssc).register()
     ssc.stop()  // stop before start should not throw exception
     ssc.start()
     ssc.stop()
@@ -147,12 +147,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   test("stop only streaming context") {
     ssc = new StreamingContext(master, appName, batchDuration)
     sc = ssc.sparkContext
-    addInputStream(ssc).register
+    addInputStream(ssc).register()
     ssc.start()
-    ssc.stop(false)
+    ssc.stop(stopSparkContext = false)
     assert(sc.makeRDD(1 to 100).collect().size === 100)
     ssc = new StreamingContext(sc, batchDuration)
-    addInputStream(ssc).register
+    addInputStream(ssc).register()
     ssc.start()
     ssc.stop()
   }
@@ -167,11 +167,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
       var runningCount = 0
       TestReceiver.counter.set(1)
       val input = ssc.receiverStream(new TestReceiver)
-      input.count.foreachRDD(rdd => {
+      input.count().foreachRDD { rdd =>
         val count = rdd.first()
         runningCount += count.toInt
         logInfo("Count = " + count + ", Running count = " + runningCount)
-      })
+      }
       ssc.start()
       ssc.awaitTermination(500)
       ssc.stop(stopSparkContext = false, stopGracefully = true)
@@ -191,7 +191,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   test("awaitTermination") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
-    inputStream.map(x => x).register
+    inputStream.map(x => x).register()
 
     // test whether start() blocks indefinitely or not
     failAfter(2000 millis) {
@@ -215,7 +215,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
     // test whether wait exits if context is stopped
     failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
       new Thread() {
-        override def run {
+        override def run() {
           Thread.sleep(500)
           ssc.stop()
         }
@@ -239,8 +239,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   test("awaitTermination with error in task") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
-    inputStream.map(x => { throw new TestException("error in map task"); x})
-               .foreachRDD(_.count)
+    inputStream
+      .map { x => throw new TestException("error in map task"); x }
+      .foreachRDD(_.count())
 
     val exception = intercept[Exception] {
       ssc.start()
@@ -252,7 +253,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   test("awaitTermination with error in job generation") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
-    inputStream.transform(rdd => { throw new TestException("error in transform"); rdd }).register
+    inputStream.transform { rdd => throw new TestException("error in transform"); rdd }.register()
     val exception = intercept[TestException] {
       ssc.start()
       ssc.awaitTermination(5000)
@@ -265,7 +266,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   }
 
   def addInputStream(s: StreamingContext): DStream[Int] = {
-    val input = (1 to 100).map(i => (1 to i))
+    val input = (1 to 100).map(i => 1 to i)
     val inputStream = new TestInputStream(s, input, 1)
     inputStream
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org