You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/11 00:11:45 UTC
[3/4] Remove Unnecessary Whitespace's
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/main/scala/org/apache/spark/util/NextIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
index 8266e5e..e5c732a 100644
--- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
/** Provides a basic/boilerplate Iterator implementation. */
private[spark] abstract class NextIterator[U] extends Iterator[U] {
-
+
private var gotNext = false
private var nextValue: U = _
private var closed = false
@@ -34,7 +34,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
* This convention is required because `null` may be a valid value,
* and using `Option` seems like it might create unnecessary Some/None
* instances, given some iterators might be called in a tight loop.
- *
+ *
* @return U, or set 'finished' when done
*/
protected def getNext(): U
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/main/scala/org/apache/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 732748a..d80eed4 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -62,10 +62,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
if (n == 0) {
mu = other.mu
m2 = other.m2
- n = other.n
+ n = other.n
maxValue = other.maxValue
minValue = other.minValue
- } else if (other.n != 0) {
+ } else if (other.n != 0) {
val delta = other.mu - mu
if (other.n * 10 < n) {
mu = mu + (delta * other.n) / (n + other.n)
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index 3c8f94a..1a647fa 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -136,7 +136,7 @@ object Vector {
def ones(length: Int) = Vector(length, _ => 1)
/**
- * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
+ * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) =
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
index 8a4cdea..7f22038 100644
--- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
@@ -25,28 +25,28 @@ import scala.util.hashing.MurmurHash3
import org.apache.spark.util.Utils.timeIt
/**
- * This class implements a XORShift random number generator algorithm
+ * This class implements a XORShift random number generator algorithm
* Source:
* Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14.
* @see <a href="http://www.jstatsoft.org/v08/i14/paper">Paper</a>
* This implementation is approximately 3.5 times faster than
* {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due
- * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class
+ * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class
* uses a regular Long. We can forgo thread safety since we use a new instance of the RNG
* for each thread.
*/
private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) {
-
+
def this() = this(System.nanoTime)
private var seed = XORShiftRandom.hashSeed(init)
// we need to just override next - this will be called by nextInt, nextDouble,
// nextGaussian, nextLong, etc.
- override protected def next(bits: Int): Int = {
+ override protected def next(bits: Int): Int = {
var nextSeed = seed ^ (seed << 21)
nextSeed ^= (nextSeed >>> 35)
- nextSeed ^= (nextSeed << 4)
+ nextSeed ^= (nextSeed << 4)
seed = nextSeed
(nextSeed & ((1L << bits) -1)).asInstanceOf[Int]
}
@@ -89,7 +89,7 @@ private[spark] object XORShiftRandom {
val million = 1e6.toInt
val javaRand = new JavaRandom(seed)
val xorRand = new XORShiftRandom(seed)
-
+
// this is just to warm up the JIT - we're not timing anything
timeIt(1e6.toInt) {
javaRand.nextInt()
@@ -97,9 +97,9 @@ private[spark] object XORShiftRandom {
}
val iters = timeIt(numIters)(_)
-
+
/* Return results as a map instead of just printing to screen
- in case the user wants to do something with them */
+ in case the user wants to do something with them */
Map("javaTime" -> iters {javaRand.nextInt()},
"xorTime" -> iters {xorRand.nextInt()})
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
index c5f24c6..c645e4c 100644
--- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
@@ -37,7 +37,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -54,14 +54,14 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
assert(securityManagerBad.isAuthenticationEnabled() === true)
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = conf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+ intercept[akka.actor.ActorNotFound] {
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
}
actorSystem.shutdown()
@@ -75,7 +75,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -91,7 +91,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
badconf.set("spark.authenticate.secret", "good")
val securityManagerBad = new SecurityManager(badconf);
- val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+ val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
conf = badconf, securityManager = securityManagerBad)
val slaveTracker = new MapOutputTrackerWorker(conf)
val selection = slaveSystem.actorSelection(
@@ -127,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -180,7 +180,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val securityManager = new SecurityManager(conf);
val hostname = "localhost"
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
@@ -204,8 +204,8 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+ intercept[akka.actor.ActorNotFound] {
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
}
actorSystem.shutdown()
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 9cbdfc5..7f59bdc 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -39,7 +39,7 @@ class DriverSuite extends FunSuite with Timeouts {
failAfter(60 seconds) {
Utils.executeAndGetOutput(
Seq("./bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
- new File(sparkHome),
+ new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index aee9ab9..d651fbb 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -45,7 +45,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val pw = new PrintWriter(textFile)
pw.println("100")
pw.close()
-
+
val jarFile = new File(tmpDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
@@ -53,7 +53,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
-
+
val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 01af940..b9b668d 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -106,7 +106,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
- val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
nums.saveAsSequenceFile(outputDir)
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
index 0b5ed6d..5e538d6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
@@ -45,4 +45,4 @@ class WorkerWatcherSuite extends FunSuite {
actorRef.underlyingActor.receive(new DisassociatedEvent(null, otherAkkaAddress, false))
assert(!actorRef.underlyingActor.isShutDown)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 09e35bf..e89b296 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -42,7 +42,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
override def beforeAll() {
sc = new SparkContext("local", "test")
-
+
// Set the block size of local file system to test whether files are split right or not.
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index a4381a8..4df3655 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -34,14 +34,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(1).mkString(",") === "2")
assert(slices(2).mkString(",") === "3")
}
-
+
test("one slice") {
val data = Array(1, 2, 3)
val slices = ParallelCollectionRDD.slice(data, 1)
assert(slices.size === 1)
assert(slices(0).mkString(",") === "1,2,3")
}
-
+
test("equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -50,7 +50,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(1).mkString(",") === "4,5,6")
assert(slices(2).mkString(",") === "7,8,9")
}
-
+
test("non-equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -77,14 +77,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
assert(slices(2).mkString(",") === (67 to 100).mkString(","))
}
-
+
test("empty data") {
val data = new Array[Int](0)
val slices = ParallelCollectionRDD.slice(data, 5)
assert(slices.size === 5)
for (slice <- slices) assert(slice.size === 0)
}
-
+
test("zero slices") {
val data = Array(1, 2, 3)
intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) }
@@ -94,7 +94,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
val data = Array(1, 2, 3)
intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) }
}
-
+
test("exclusive ranges sliced into ranges") {
val data = 1 until 100
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -102,7 +102,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[Range]))
}
-
+
test("inclusive ranges sliced into ranges") {
val data = 1 to 100
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -124,7 +124,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(range.step === 1, "slice " + i + " step")
}
}
-
+
test("random array tests") {
val gen = for {
d <- arbitrary[List[Int]]
@@ -141,7 +141,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
}
check(prop)
}
-
+
test("random exclusive range tests") {
val gen = for {
a <- Gen.choose(-100, 100)
@@ -177,7 +177,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
}
check(prop)
}
-
+
test("exclusive ranges of longs") {
val data = 1L until 100L
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -185,7 +185,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
-
+
test("inclusive ranges of longs") {
val data = 1L to 100L
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -193,7 +193,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
-
+
test("exclusive ranges of doubles") {
val data = 1.0 until 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
@@ -201,7 +201,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
-
+
test("inclusive ranges of doubles") {
val data = 1.0 to 100.0 by 1.0
val slices = ParallelCollectionRDD.slice(data, 3)
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index dc704e0..4cdccdd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -216,7 +216,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("onTaskGettingResult() called when result fetched remotely") {
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
-
+
// Make a task whose result is larger than the akka frame size
System.setProperty("spark.akka.frameSize", "1")
val akkaFrameSize =
@@ -236,7 +236,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
test("onTaskGettingResult() not called when result sent directly") {
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
-
+
// Make a task whose result is larger than the akka frame size
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 356e28d..2fb750d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -264,7 +264,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
test("Scheduler does not always schedule tasks on the same workers") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
- val taskScheduler = new TaskSchedulerImpl(sc)
+ val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 45c3224..2f9739f 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -33,8 +33,8 @@ class UISuite extends FunSuite {
val server = new Server(startPort)
Try { server.start() } match {
- case Success(s) =>
- case Failure(e) =>
+ case Success(s) =>
+ case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
val serverInfo1 = JettyUtils.startJettyServer(
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 439e564..d7e48e6 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -69,7 +69,7 @@ object TestObject {
class TestClass extends Serializable {
var x = 5
-
+
def getX = x
def run(): Int = {
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
index e1446cb..32d74d0 100644
--- a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -32,7 +32,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers {
i.hasNext should be === false
intercept[NoSuchElementException] { i.next() }
}
-
+
test("two iterations") {
val i = new StubIterator(Buffer(1, 2))
i.hasNext should be === true
@@ -70,7 +70,7 @@ class NextIteratorSuite extends FunSuite with ShouldMatchers {
class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
var closeCalled = 0
-
+
override def getNext() = {
if (ints.size == 0) {
finished = true
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
index 757476e..39199a1 100644
--- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala
@@ -29,12 +29,12 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
val xorRand = new XORShiftRandom(seed)
val hundMil = 1e8.toInt
}
-
+
/*
- * This test is based on a chi-squared test for randomness. The values are hard-coded
+ * This test is based on a chi-squared test for randomness. The values are hard-coded
* so as not to create Spark's dependency on apache.commons.math3 just to call one
* method for calculating the exact p-value for a given number of random numbers
- * and bins. In case one would want to move to a full-fledged test based on
+ * and bins. In case one would want to move to a full-fledged test based on
* apache.commons.math3, the relevant class is here:
* org.apache.commons.math3.stat.inference.ChiSquareTest
*/
@@ -49,19 +49,19 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
// populate bins based on modulus of the random number
times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1}
- /* since the seed is deterministic, until the algorithm is changed, we know the result will be
- * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
- * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%)
- * significance level. However, should the RNG implementation change, the test should still
- * pass at the same significance level. The chi-squared test done in R gave the following
+ /* since the seed is deterministic, until the algorithm is changed, we know the result will be
+ * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
+ * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%)
+ * significance level. However, should the RNG implementation change, the test should still
+ * pass at the same significance level. The chi-squared test done in R gave the following
* results:
* > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
* 10000790, 10002286, 9998699))
* Chi-squared test for given probabilities
- * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790,
+ * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790,
* 10002286, 9998699)
* X-squared = 11.975, df = 9, p-value = 0.2147
- * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million
+ * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million
* random numbers
* and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared
* is greater than or equal to that number.
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 41e813d..1204cfb 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -48,41 +48,41 @@ import org.apache.spark.streaming.dstream._
* @param storageLevel RDD storage level.
*/
-private[streaming]
+private[streaming]
class MQTTInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
-
+
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
-private[streaming]
+private[streaming]
class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
-
+
def onStop() {
blockGenerator.stop()
}
-
+
def onStart() {
blockGenerator.start()
- // Set up persistence for messages
+ // Set up persistence for messages
var peristance: MqttClientPersistence = new MemoryPersistence()
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
- // Connect to MqttBroker
+ // Connect to MqttBroker
client.connect()
// Subscribe to Mqtt topic
@@ -91,7 +91,7 @@ class MQTTReceiver(brokerUrl: String,
// Callback automatically triggers as and when new message arrives on specified topic
var callback: MqttCallback = new MqttCallback() {
- // Handles Mqtt message
+ // Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
blockGenerator += new String(arg1.getPayload())
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 3316b6d..843a4a7 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
* such that this may return a sampled subset of all tweets during each interval.
-*
+*
* If no Authorization object is provided, initializes OAuth authorization using the system
* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
*/
@@ -42,13 +42,13 @@ class TwitterInputDStream(
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
-
+
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
}
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
-
+
override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 377d9d6..5635287 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -172,7 +172,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
"EdgeDirection.Either instead.")
}
}
-
+
/**
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value. The input table
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 6386306..a467ca1 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -55,7 +55,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
}
}
}
-
+
test ("filter") {
withSpark { sc =>
val n = 5
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
index e41d9bb..7f6d945 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.mllib.linalg.Vector
trait Optimizer extends Serializable {
/**
- * Solve the provided convex optimization problem.
+ * Solve the provided convex optimization problem.
*/
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 3bd0017..d969e7a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -26,7 +26,7 @@ import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Vectors, Vector}
/**
- * GeneralizedLinearModel (GLM) represents a model trained using
+ * GeneralizedLinearModel (GLM) represents a model trained using
* GeneralizedLinearAlgorithm. GLMs consist of a weight vector and
* an intercept.
*
@@ -38,7 +38,7 @@ abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double
/**
* Predict the result given a data point and the weights learned.
- *
+ *
* @param dataMatrix Row vector containing the features for this data point
* @param weightMatrix Column vector containing the weights of the model
* @param intercept Intercept of the model.
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index a30dcfd..687e85c 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -35,7 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used.
* Allows the user to specify if user class path should be first
- */
+ */
class ExecutorClassLoader(classUri: String, parent: ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader {
val uri = new URI(classUri)
@@ -94,7 +94,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader,
case e: Exception => None
}
}
-
+
def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
if (name.startsWith("line") && name.endsWith("$iw$")) {
// Class seems to be an interpreter "wrapper" object storing a val or var.
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
index 8f61a5e..419796b 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala
@@ -187,7 +187,7 @@ trait SparkImports {
if (currentImps contains imv) addWrapper()
val objName = req.lineRep.readPath
val valName = "$VAL" + newValId();
-
+
if(!code.toString.endsWith(".`" + imv + "`;\n")) { // Which means already imported
code.append("val " + valName + " = " + objName + ".INSTANCE;\n")
code.append("import " + valName + req.accessPath + ".`" + imv + "`;\n")
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 1711849..1f3fab0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -28,7 +28,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
override def toString = s"CAST($child, $dataType)"
type EvaluatedType = Any
-
+
def nullOrCast[T](a: Any, func: T => Any): Any = if(a == null) {
null
} else {
@@ -40,7 +40,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case BinaryType => nullOrCast[Array[Byte]](_, new String(_, "UTF-8"))
case _ => nullOrCast[Any](_, _.toString)
}
-
+
// BinaryConverter
def castToBinary: Any => Any = child.dataType match {
case StringType => nullOrCast[String](_, _.getBytes("UTF-8"))
@@ -58,7 +58,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case DoubleType => nullOrCast[Double](_, _ != 0)
case FloatType => nullOrCast[Float](_, _ != 0)
}
-
+
// TimestampConverter
def castToTimestamp: Any => Any = child.dataType match {
case StringType => nullOrCast[String](_, s => {
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 8a1db8e..dd9332a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -86,7 +86,7 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed
+ * Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed
* to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@@ -120,7 +120,7 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Fractional children expressions. Those expressions are
+ * Evaluation helper function for 2 Fractional children expressions. Those expressions are
* supposed to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@@ -153,7 +153,7 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Integral children expressions. Those expressions are
+ * Evaluation helper function for 2 Integral children expressions. Those expressions are
* supposed to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@@ -186,12 +186,12 @@ abstract class Expression extends TreeNode[Expression] {
}
/**
- * Evaluation helper function for 2 Comparable children expressions. Those expressions are
+ * Evaluation helper function for 2 Comparable children expressions. Those expressions are
* supposed to be in the same data type, and the return type should be Integer:
* Negative value: 1st argument less than 2nd argument
* Zero: 1st argument equals 2nd argument
* Positive value: 1st argument greater than 2nd argument
- *
+ *
* Either one of the expressions result is null, the evaluation result should be null.
*/
@inline
@@ -213,7 +213,7 @@ abstract class Expression extends TreeNode[Expression] {
null
} else {
e1.dataType match {
- case i: NativeType =>
+ case i: NativeType =>
f.asInstanceOf[(Ordering[i.JvmType], i.JvmType, i.JvmType) => Boolean](
i.ordering, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
case other => sys.error(s"Type $other does not support ordered operations")
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index a27c71d..ddc16ce 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -28,19 +28,19 @@ trait StringRegexExpression {
self: BinaryExpression =>
type EvaluatedType = Any
-
+
def escape(v: String): String
def matches(regex: Pattern, str: String): Boolean
-
+
def nullable: Boolean = true
def dataType: DataType = BooleanType
-
- // try cache the pattern for Literal
+
+ // try cache the pattern for Literal
private lazy val cache: Pattern = right match {
case x @ Literal(value: String, StringType) => compile(value)
case _ => null
}
-
+
protected def compile(str: String): Pattern = if(str == null) {
null
} else {
@@ -49,7 +49,7 @@ trait StringRegexExpression {
}
protected def pattern(str: String) = if(cache == null) compile(str) else cache
-
+
override def eval(input: Row): Any = {
val l = left.eval(input)
if (l == null) {
@@ -73,11 +73,11 @@ trait StringRegexExpression {
/**
* Simple RegEx pattern matching function
*/
-case class Like(left: Expression, right: Expression)
+case class Like(left: Expression, right: Expression)
extends BinaryExpression with StringRegexExpression {
-
+
def symbol = "LIKE"
-
+
// replace the _ with .{1} exactly match 1 time of any character
// replace the % with .*, match 0 or more times with any character
override def escape(v: String) = {
@@ -98,19 +98,19 @@ case class Like(left: Expression, right: Expression)
sb.append(Pattern.quote(Character.toString(n)));
}
}
-
+
i += 1
}
-
+
sb.toString()
}
-
+
override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).matches()
}
-case class RLike(left: Expression, right: Expression)
+case class RLike(left: Expression, right: Expression)
extends BinaryExpression with StringRegexExpression {
-
+
def symbol = "RLIKE"
override def escape(v: String): String = v
override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0)
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index cdeb01a..da34bd3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -55,9 +55,9 @@ case object BooleanType extends NativeType {
case object TimestampType extends NativeType {
type JvmType = Timestamp
-
+
@transient lazy val tag = typeTag[JvmType]
-
+
val ordering = new Ordering[JvmType] {
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 888a19d..2cd0d2b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -144,7 +144,7 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation("abc" like "b%", false)
checkEvaluation("abc" like "bc%", false)
}
-
+
test("LIKE Non-literal Regular Expression") {
val regEx = 'a.string.at(0)
checkEvaluation("abcd" like regEx, null, new GenericRow(Array[Any](null)))
@@ -164,7 +164,7 @@ class ExpressionEvaluationSuite extends FunSuite {
test("RLIKE literal Regular Expression") {
checkEvaluation("abdef" rlike "abdef", true)
checkEvaluation("abbbbc" rlike "a.*c", true)
-
+
checkEvaluation("fofo" rlike "^fo", true)
checkEvaluation("fo\no" rlike "^fo\no$", true)
checkEvaluation("Bn" rlike "^Ba*n", true)
@@ -196,9 +196,9 @@ class ExpressionEvaluationSuite extends FunSuite {
evaluate("abbbbc" rlike regEx, new GenericRow(Array[Any]("**")))
}
}
-
+
test("data type casting") {
-
+
val sts = "1970-01-01 00:00:01.0"
val ts = Timestamp.valueOf(sts)
@@ -236,7 +236,7 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation("23" cast ShortType, 23)
checkEvaluation("2012-12-11" cast DoubleType, null)
checkEvaluation(Literal(123) cast IntegerType, 123)
-
+
intercept[Exception] {evaluate(Literal(1) cast BinaryType, null)}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index 65eae33..1cbf973 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -56,4 +56,4 @@ class ScalaReflectionRelationSuite extends FunSuite {
val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]]
assert(result.toSeq === Seq[Byte](1))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 93023e8..ac56ff7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -59,7 +59,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
-private[streaming]
+private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
@@ -79,7 +79,7 @@ object Checkpoint extends Logging {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
- (time1 < time2) || (time1 == time2 && bk1)
+ (time1 < time2) || (time1 == time2 && bk1)
}
val path = new Path(checkpointDir)
@@ -95,7 +95,7 @@ object Checkpoint extends Logging {
}
} else {
logInfo("Checkpoint directory " + path + " does not exist")
- Seq.empty
+ Seq.empty
}
}
}
@@ -160,7 +160,7 @@ class CheckpointWriter(
})
}
- // All done, print success
+ // All done, print success
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
@@ -227,14 +227,14 @@ object CheckpointReader extends Logging {
{
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
-
- // Try to find the checkpoint files
+
+ // Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
- // Try to read the checkpoint files in the order
+ // Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
checkpointFiles.foreach(file => {
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index 16479a0..ad4f3fd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -20,11 +20,11 @@ package org.apache.spark.streaming
private[streaming]
class Interval(val beginTime: Time, val endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs))
-
+
def duration(): Duration = endTime - beginTime
def + (time: Duration): Interval = {
- new Interval(beginTime + time, endTime + time)
+ new Interval(beginTime + time, endTime + time)
}
def - (time: Duration): Interval = {
@@ -40,9 +40,9 @@ class Interval(val beginTime: Time, val endTime: Time) {
}
def <= (that: Interval) = (this < that || this == that)
-
+
def > (that: Interval) = !(this <= that)
-
+
def >= (that: Interval) = !(this < that)
override def toString = "[" + beginTime + ", " + endTime + "]"
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 2678334..6a6b00a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -32,7 +32,7 @@ case class Time(private val millis: Long) {
def <= (that: Time): Boolean = (this.millis <= that.millis)
def > (that: Time): Boolean = (this.millis > that.millis)
-
+
def >= (that: Time): Boolean = (this.millis >= that.millis)
def + (that: Duration): Time = new Time(millis + that.milliseconds)
@@ -43,7 +43,7 @@ case class Time(private val millis: Long) {
def floor(that: Duration): Time = {
val t = that.milliseconds
- val m = math.floor(this.millis / t).toLong
+ val m = math.floor(this.millis / t).toLong
new Time(m * t)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 903e3f3..f33c0ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -51,7 +51,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
.map(x => (x._1, x._2.getCheckpointFile.get))
logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
- // Add the checkpoint files to the data to be serialized
+ // Add the checkpoint files to the data to be serialized
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 8a60516..e878285 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -232,7 +232,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
logDebug("Accepted " + path)
} catch {
- case fnfe: java.io.FileNotFoundException =>
+ case fnfe: java.io.FileNotFoundException =>
logWarning("Error finding new files", fnfe)
reset()
return false
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 97325f8..6376cff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -31,11 +31,11 @@ class QueueInputDStream[T: ClassTag](
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {
-
+
override def start() { }
-
+
override def stop() { }
-
+
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
@@ -55,5 +55,5 @@ class QueueInputDStream[T: ClassTag](
None
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index 44eb275..f5984d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -47,7 +47,7 @@ object ReceiverSupervisorStrategy {
* the API for pushing received data into Spark Streaming for being processed.
*
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
+ *
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index c5ef2cc..39145a3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -19,34 +19,34 @@ package org.apache.spark.streaming.util
private[streaming]
trait Clock {
- def currentTime(): Long
+ def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
private[streaming]
class SystemClock() extends Clock {
-
+
val minPollTime = 25L
-
+
def currentTime(): Long = {
System.currentTimeMillis()
- }
-
+ }
+
def waitTillTime(targetTime: Long): Long = {
var currentTime = 0L
currentTime = System.currentTimeMillis()
-
+
var waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}
-
+
val pollTime = {
if (waitTime / 10.0 > minPollTime) {
(waitTime / 10.0).toLong
} else {
- minPollTime
- }
+ minPollTime
+ }
}
while (true) {
@@ -55,7 +55,7 @@ class SystemClock() extends Clock {
if (waitTime <= 0) {
return currentTime
}
- val sleepTime =
+ val sleepTime =
if (waitTime < pollTime) {
waitTime
} else {
@@ -69,7 +69,7 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {
-
+
var time = 0L
def currentTime() = time
@@ -85,13 +85,13 @@ class ManualClock() extends Clock {
this.synchronized {
time += timeToAdd
this.notifyAll()
- }
+ }
}
def waitTillTime(targetTime: Long): Long = {
this.synchronized {
while (time < targetTime) {
this.wait(100)
- }
+ }
}
currentTime()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 07021eb..bd1df55 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -25,8 +25,8 @@ import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
- * Splits lines and counts the words in them using specialized object-to-long hashmap
+ /**
+ * Splits lines and counts the words in them using specialized object-to-long hashmap
* (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
@@ -55,13 +55,13 @@ object RawTextHelper {
map.toIterator.map{case (k, v) => (k, v)}
}
- /**
+ /**
* Gets the top k words in terms of word counts. Assumes that each word exists only once
* in the `data` iterator (that is, the counts have been reduced).
*/
def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
val taken = new Array[(String, Long)](k)
-
+
var i = 0
var len = 0
var done = false
@@ -93,7 +93,7 @@ object RawTextHelper {
}
taken.toIterator
}
-
+
/**
* Warms up the SparkContext in master and slave by running tasks to force JIT kick in
* before real workload starts.
@@ -106,11 +106,11 @@ object RawTextHelper {
.count()
}
}
-
- def add(v1: Long, v2: Long) = (v1 + v2)
- def subtract(v1: Long, v2: Long) = (v1 - v2)
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
- def max(v1: Long, v2: Long) = math.max(v1, v2)
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index f71938a..e016377 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -22,10 +22,10 @@ import org.apache.spark.Logging
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {
-
+
private val thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
- override def run() { loop }
+ override def run() { loop }
}
@volatile private var prevTime = -1L
@@ -104,11 +104,11 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
private[streaming]
object RecurringTimer {
-
+
def main(args: Array[String]) {
var lastRecurTime = 0L
val period = 1000
-
+
def onRecur(time: Long) {
val currentTime = System.currentTimeMillis()
println("" + currentTime + ": " + (currentTime - lastRecurTime))
http://git-wip-us.apache.org/repos/asf/spark/blob/09bf14b7/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 13fa648..a0b1bbc 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1673,7 +1673,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
-
+
class Converter implements Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));