You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/13 06:31:31 UTC
[1/6] Remove simple redundant return statement for Scala
methods/functions:
Updated Branches:
refs/heads/master 405bfe86e -> 0ab505a29
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index be323d7..efeee31 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -205,9 +205,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
- return false
+ false
+ } else {
+ true
}
- return true
}
/** Copy the file into HDFS if needed. */
[5/6] git commit: Address code review concerns and comments.
Posted by pw...@apache.org.
Address code review concerns and comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5a8abfb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5a8abfb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5a8abfb7
Branch: refs/heads/master
Commit: 5a8abfb70efd89ec4120c7f78596d8b32a9f4f3d
Parents: f1c5eca
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 19:15:09 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 19:15:09 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +-
.../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 3 ++-
core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 6 +++---
.../scala/org/apache/spark/storage/BlockManagerWorker.scala | 6 +++---
.../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 3 ++-
.../test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala | 5 +----
.../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++---
.../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++---
8 files changed, 18 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index bba873a..4e63117 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -189,7 +189,7 @@ object SparkHadoopWriter {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
}
- var outputPath = new Path(path)
+ val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8830de7..82527fe 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
- new Iterator[Array[Byte]] {
+ val stdoutIterator = new Iterator[Array[Byte]] {
def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
@@ -156,6 +156,7 @@ private[spark] class PythonRDD[T: ClassTag](
def hasNext = _nextObj.length != 0
}
+ stdoutIterator
}
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index a5394a2..cefcc3d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -295,9 +295,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val prefPartActual = prefPart.get
- if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
+ if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
minPowerOfTwo // prefer balance over locality
- else {
+ } else {
prefPartActual // prefer locality over balance
}
}
@@ -331,7 +331,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
*/
def run(): Array[PartitionGroup] = {
setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
- throwBalls() // assign partitions (balls) to each group (bins)
+ throwBalls() // assign partitions (balls) to each group (bins)
getPartitions
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index a36abe0..42f52d7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -45,7 +45,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {
case e: Exception => logError("Exception handling buffer message", e)
- return None
+ None
}
}
case otherMessage: Any => {
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
- return (resultMessage != None)
+ resultMessage != None
}
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
@@ -130,7 +130,7 @@ private[spark] object BlockManagerWorker extends Logging {
return blockMessage.getData
})
}
- case None => logDebug("No response message received"); return null
+ case None => logDebug("No response message received")
}
null
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 14f89d5..f0236ef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
locations: Seq[Seq[String]] = Nil
): MyRDD = {
val maxPartition = numPartitions - 1
- new MyRDD(sc, dependencies) {
+ val newRDD = new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
@@ -135,6 +135,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
Nil
override def toString: String = "DAGSchedulerSuiteRDD " + id
}
+ newRDD
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 3880e68..2910291 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -42,10 +42,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
def buildJobDepTest(jobID: Int, stage: Stage) = buildJobDep(jobID, stage)
}
type MyRDD = RDD[(Int, Int)]
- def makeRdd(
- numPartitions: Int,
- dependencies: List[Dependency[_]]
- ): MyRDD = {
+ def makeRdd(numPartitions: Int, dependencies: List[Dependency[_]]): MyRDD = {
val maxPartition = numPartitions - 1
new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e1fe09e..e56bc02 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -191,10 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
+ return false
}
+
+ true
}
/** Copy the file into HDFS if needed. */
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a8abfb7/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index c084485..51d9adb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -206,10 +206,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
+ return false
}
+
+ true
}
/** Copy the file into HDFS if needed. */
[4/6] git commit: Fix accidental comment modification.
Posted by pw...@apache.org.
Fix accidental comment modification.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f1c5eca4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f1c5eca4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f1c5eca4
Branch: refs/heads/master
Commit: f1c5eca494f798fc22c46d245435381a89098fe4
Parents: 91a5636
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 10:40:21 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 10:40:21 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f1c5eca4/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 57bdf22..80a611b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {
- // Cache a serialized version of the output statuses for each shuffle to send them out faster return
+ // Cache a serialized version of the output statuses for each shuffle to send them out faster
private var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
[3/6] git commit: Merge branch 'master' into
remove_simpleredundantreturn_scala
Posted by pw...@apache.org.
Merge branch 'master' into remove_simpleredundantreturn_scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/91a56360
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/91a56360
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/91a56360
Branch: refs/heads/master
Commit: 91a563608e301bb243fca3765d569bde65ad747c
Parents: 93a65e5 288a878
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 10:34:13 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 10:34:13 2014 -0800
----------------------------------------------------------------------
bin/pyspark | 7 +-
conf/log4j.properties.template | 4 +-
core/pom.xml | 10 +
.../org/apache/spark/log4j-defaults.properties | 4 +-
.../scala/org/apache/spark/Aggregator.scala | 61 ++--
.../scala/org/apache/spark/SparkContext.scala | 122 ++++---
.../main/scala/org/apache/spark/SparkEnv.scala | 6 +-
.../apache/spark/api/java/JavaDoubleRDD.scala | 12 +-
.../org/apache/spark/api/java/JavaPairRDD.scala | 6 +
.../org/apache/spark/api/java/JavaRDD.scala | 6 +
.../org/apache/spark/api/java/JavaRDDLike.scala | 6 +
.../spark/api/java/JavaSparkContext.scala | 54 ++-
.../scala/org/apache/spark/deploy/Client.scala | 151 ++++++++
.../apache/spark/deploy/ClientArguments.scala | 117 +++++++
.../org/apache/spark/deploy/DeployMessage.scala | 52 ++-
.../apache/spark/deploy/DriverDescription.scala | 29 ++
.../apache/spark/deploy/client/AppClient.scala | 201 +++++++++++
.../spark/deploy/client/AppClientListener.scala | 39 +++
.../org/apache/spark/deploy/client/Client.scala | 200 -----------
.../spark/deploy/client/ClientListener.scala | 39 ---
.../apache/spark/deploy/client/TestClient.scala | 4 +-
.../apache/spark/deploy/master/DriverInfo.scala | 36 ++
.../spark/deploy/master/DriverState.scala | 33 ++
.../master/FileSystemPersistenceEngine.scala | 17 +-
.../org/apache/spark/deploy/master/Master.scala | 189 +++++++++-
.../spark/deploy/master/PersistenceEngine.scala | 11 +-
.../apache/spark/deploy/master/WorkerInfo.scala | 20 +-
.../master/ZooKeeperPersistenceEngine.scala | 14 +-
.../deploy/master/ui/ApplicationPage.scala | 4 +-
.../spark/deploy/master/ui/IndexPage.scala | 56 ++-
.../spark/deploy/worker/CommandUtils.scala | 63 ++++
.../spark/deploy/worker/DriverRunner.scala | 234 +++++++++++++
.../spark/deploy/worker/DriverWrapper.scala | 31 ++
.../spark/deploy/worker/ExecutorRunner.scala | 67 +---
.../org/apache/spark/deploy/worker/Worker.scala | 63 +++-
.../spark/deploy/worker/WorkerWatcher.scala | 55 +++
.../spark/deploy/worker/ui/IndexPage.scala | 65 +++-
.../spark/deploy/worker/ui/WorkerWebUI.scala | 43 ++-
.../executor/CoarseGrainedExecutorBackend.scala | 27 +-
.../org/apache/spark/executor/Executor.scala | 5 +
.../org/apache/spark/rdd/CoGroupedRDD.scala | 88 +++--
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 37 +-
.../org/apache/spark/rdd/NewHadoopRDD.scala | 36 +-
.../org/apache/spark/rdd/PairRDDFunctions.scala | 7 +-
.../apache/spark/scheduler/DAGScheduler.scala | 4 +-
.../cluster/SparkDeploySchedulerBackend.scala | 10 +-
.../org/apache/spark/storage/BlockId.scala | 12 +-
.../org/apache/spark/storage/BlockManager.scala | 4 +-
.../spark/storage/BlockManagerMasterActor.scala | 7 +-
.../spark/storage/BlockObjectWriter.scala | 4 +
.../apache/spark/storage/DiskBlockManager.scala | 11 +-
.../spark/storage/ShuffleBlockManager.scala | 2 +-
.../org/apache/spark/ui/jobs/StagePage.scala | 2 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 2 +-
.../org/apache/spark/util/AppendOnlyMap.scala | 237 -------------
.../apache/spark/util/TimeStampedHashMap.scala | 17 +-
.../scala/org/apache/spark/util/Utils.scala | 28 +-
.../scala/org/apache/spark/util/Vector.scala | 8 +
.../spark/util/collection/AppendOnlyMap.scala | 297 ++++++++++++++++
.../util/collection/ExternalAppendOnlyMap.scala | 350 +++++++++++++++++++
.../collection/SizeTrackingAppendOnlyMap.scala | 101 ++++++
.../apache/spark/deploy/JsonProtocolSuite.scala | 40 ++-
.../spark/deploy/worker/DriverRunnerTest.scala | 131 +++++++
.../deploy/worker/ExecutorRunnerTest.scala | 4 +-
.../deploy/worker/WorkerWatcherSuite.scala | 32 ++
.../apache/spark/util/AppendOnlyMapSuite.scala | 154 --------
.../util/SizeTrackingAppendOnlyMapSuite.scala | 120 +++++++
.../org/apache/spark/util/VectorSuite.scala | 44 +++
.../util/collection/AppendOnlyMapSuite.scala | 198 +++++++++++
.../collection/ExternalAppendOnlyMapSuite.scala | 230 ++++++++++++
docs/configuration.md | 25 +-
docs/python-programming-guide.md | 5 +-
docs/running-on-yarn.md | 15 +-
docs/spark-standalone.md | 38 +-
ec2/spark_ec2.py | 12 +-
.../streaming/examples/JavaFlumeEventCount.java | 2 +
.../streaming/examples/JavaKafkaWordCount.java | 2 +
.../examples/JavaNetworkWordCount.java | 11 +-
.../streaming/examples/JavaQueueStream.java | 2 +
.../spark/examples/DriverSubmissionTest.scala | 46 +++
.../streaming/examples/ActorWordCount.scala | 12 +-
.../streaming/examples/FlumeEventCount.scala | 4 +-
.../streaming/examples/HdfsWordCount.scala | 3 +-
.../streaming/examples/KafkaWordCount.scala | 5 +-
.../streaming/examples/MQTTWordCount.scala | 8 +-
.../streaming/examples/NetworkWordCount.scala | 7 +-
.../spark/streaming/examples/QueueStream.scala | 8 +-
.../streaming/examples/RawNetworkGrep.scala | 5 +-
.../examples/RecoverableNetworkWordCount.scala | 118 +++++++
.../examples/StatefulNetworkWordCount.scala | 2 +
.../streaming/examples/StreamingExamples.scala | 21 ++
.../streaming/examples/TwitterAlgebirdCMS.scala | 10 +-
.../streaming/examples/TwitterAlgebirdHLL.scala | 9 +-
.../streaming/examples/TwitterPopularTags.scala | 2 +
.../streaming/examples/ZeroMQWordCount.scala | 1 +
.../examples/clickstream/PageViewStream.scala | 4 +-
external/kafka/pom.xml | 4 +-
.../spark/streaming/mqtt/MQTTInputDStream.scala | 2 +-
.../apache/spark/mllib/recommendation/ALS.scala | 15 +-
pom.xml | 17 +
project/SparkBuild.scala | 19 +-
.../org/apache/spark/streaming/Checkpoint.scala | 188 ++++++----
.../org/apache/spark/streaming/DStream.scala | 17 +-
.../spark/streaming/DStreamCheckpointData.scala | 106 +++---
.../apache/spark/streaming/DStreamGraph.scala | 38 +-
.../spark/streaming/StreamingContext.scala | 75 +++-
.../api/java/JavaStreamingContext.scala | 96 ++++-
.../streaming/dstream/FileInputDStream.scala | 42 ++-
.../streaming/scheduler/JobGenerator.scala | 31 +-
.../streaming/util/MasterFailureTest.scala | 55 +--
.../apache/spark/streaming/JavaAPISuite.java | 29 +-
.../spark/streaming/CheckpointSuite.scala | 10 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 13 +-
.../spark/deploy/yarn/WorkerLauncher.scala | 28 +-
.../cluster/YarnClientSchedulerBackend.scala | 50 +--
.../spark/deploy/yarn/ApplicationMaster.scala | 13 +-
.../org/apache/spark/deploy/yarn/Client.scala | 1 +
.../spark/deploy/yarn/WorkerLauncher.scala | 28 +-
118 files changed, 4403 insertions(+), 1231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 0000000,d98c7aa..b8c852b
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@@ -1,0 -1,297 +1,297 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.util.collection
+
+ import java.util.{Arrays, Comparator}
+
+ /**
+ * A simple open hash table optimized for the append-only use case, where keys
+ * are never removed, but the value for each key may be changed.
+ *
+ * This implementation uses quadratic probing with a power-of-2 hash table
+ * size, which is guaranteed to explore all spaces for each key (see
+ * http://en.wikipedia.org/wiki/Quadratic_probing).
+ *
+ * TODO: Cache the hash values of each key? java.util.HashMap does that.
+ */
+ private[spark]
+ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
+ V)] with Serializable {
+ require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+ require(initialCapacity >= 1, "Invalid initial capacity")
+
+ private var capacity = nextPowerOf2(initialCapacity)
+ private var mask = capacity - 1
+ private var curSize = 0
+ private var growThreshold = (LOAD_FACTOR * capacity).toInt
+
+ // Holds keys and values in the same array for memory locality; specifically, the order of
+ // elements is key0, value0, key1, value1, key2, value2, etc.
+ private var data = new Array[AnyRef](2 * capacity)
+
+ // Treat the null key differently so we can use nulls in "data" to represent empty items.
+ private var haveNullValue = false
+ private var nullValue: V = null.asInstanceOf[V]
+
+ // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
+ private var destroyed = false
+ private val destructionMessage = "Map state is invalid from destructive sorting!"
+
+ private val LOAD_FACTOR = 0.7
+
+ /** Get the value for a given key */
+ def apply(key: K): V = {
+ assert(!destroyed, destructionMessage)
+ val k = key.asInstanceOf[AnyRef]
+ if (k.eq(null)) {
+ return nullValue
+ }
+ var pos = rehash(k.hashCode) & mask
+ var i = 1
+ while (true) {
+ val curKey = data(2 * pos)
+ if (k.eq(curKey) || k.equals(curKey)) {
+ return data(2 * pos + 1).asInstanceOf[V]
+ } else if (curKey.eq(null)) {
+ return null.asInstanceOf[V]
+ } else {
+ val delta = i
+ pos = (pos + delta) & mask
+ i += 1
+ }
+ }
- return null.asInstanceOf[V]
++ null.asInstanceOf[V]
+ }
+
+ /** Set the value for a key */
+ def update(key: K, value: V): Unit = {
+ assert(!destroyed, destructionMessage)
+ val k = key.asInstanceOf[AnyRef]
+ if (k.eq(null)) {
+ if (!haveNullValue) {
+ incrementSize()
+ }
+ nullValue = value
+ haveNullValue = true
+ return
+ }
+ var pos = rehash(key.hashCode) & mask
+ var i = 1
+ while (true) {
+ val curKey = data(2 * pos)
+ if (curKey.eq(null)) {
+ data(2 * pos) = k
+ data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+ incrementSize() // Since we added a new key
+ return
+ } else if (k.eq(curKey) || k.equals(curKey)) {
+ data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+ return
+ } else {
+ val delta = i
+ pos = (pos + delta) & mask
+ i += 1
+ }
+ }
+ }
+
+ /**
+ * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
+ * for key, if any, or null otherwise. Returns the newly updated value.
+ */
+ def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
+ assert(!destroyed, destructionMessage)
+ val k = key.asInstanceOf[AnyRef]
+ if (k.eq(null)) {
+ if (!haveNullValue) {
+ incrementSize()
+ }
+ nullValue = updateFunc(haveNullValue, nullValue)
+ haveNullValue = true
+ return nullValue
+ }
+ var pos = rehash(k.hashCode) & mask
+ var i = 1
+ while (true) {
+ val curKey = data(2 * pos)
+ if (k.eq(curKey) || k.equals(curKey)) {
+ val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
+ data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
+ return newValue
+ } else if (curKey.eq(null)) {
+ val newValue = updateFunc(false, null.asInstanceOf[V])
+ data(2 * pos) = k
+ data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
+ incrementSize()
+ return newValue
+ } else {
+ val delta = i
+ pos = (pos + delta) & mask
+ i += 1
+ }
+ }
+ null.asInstanceOf[V] // Never reached but needed to keep compiler happy
+ }
+
+ /** Iterator method from Iterable */
+ override def iterator: Iterator[(K, V)] = {
+ assert(!destroyed, destructionMessage)
+ new Iterator[(K, V)] {
+ var pos = -1
+
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def nextValue(): (K, V) = {
+ if (pos == -1) { // Treat position -1 as looking at the null value
+ if (haveNullValue) {
+ return (null.asInstanceOf[K], nullValue)
+ }
+ pos += 1
+ }
+ while (pos < capacity) {
+ if (!data(2 * pos).eq(null)) {
+ return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
+ }
+ pos += 1
+ }
+ null
+ }
+
+ override def hasNext: Boolean = nextValue() != null
+
+ override def next(): (K, V) = {
+ val value = nextValue()
+ if (value == null) {
+ throw new NoSuchElementException("End of iterator")
+ }
+ pos += 1
+ value
+ }
+ }
+ }
+
+ override def size: Int = curSize
+
+ /** Increase table size by 1, rehashing if necessary */
+ private def incrementSize() {
+ curSize += 1
+ if (curSize > growThreshold) {
+ growTable()
+ }
+ }
+
+ /**
+ * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+ * We use the Murmur Hash 3 finalization step that's also used in fastutil.
+ */
+ private def rehash(h: Int): Int = {
+ it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
+ }
+
+ /** Double the table's size and re-hash everything */
+ protected def growTable() {
+ val newCapacity = capacity * 2
+ if (newCapacity >= (1 << 30)) {
+ // We can't make the table this big because we want an array of 2x
+ // that size for our data, but array sizes are at most Int.MaxValue
+ throw new Exception("Can't make capacity bigger than 2^29 elements")
+ }
+ val newData = new Array[AnyRef](2 * newCapacity)
+ val newMask = newCapacity - 1
+ // Insert all our old values into the new array. Note that because our old keys are
+ // unique, there's no need to check for equality here when we insert.
+ var oldPos = 0
+ while (oldPos < capacity) {
+ if (!data(2 * oldPos).eq(null)) {
+ val key = data(2 * oldPos)
+ val value = data(2 * oldPos + 1)
+ var newPos = rehash(key.hashCode) & newMask
+ var i = 1
+ var keepGoing = true
+ while (keepGoing) {
+ val curKey = newData(2 * newPos)
+ if (curKey.eq(null)) {
+ newData(2 * newPos) = key
+ newData(2 * newPos + 1) = value
+ keepGoing = false
+ } else {
+ val delta = i
+ newPos = (newPos + delta) & newMask
+ i += 1
+ }
+ }
+ }
+ oldPos += 1
+ }
+ data = newData
+ capacity = newCapacity
+ mask = newMask
+ growThreshold = (LOAD_FACTOR * newCapacity).toInt
+ }
+
+ private def nextPowerOf2(n: Int): Int = {
+ val highBit = Integer.highestOneBit(n)
+ if (highBit == n) n else highBit << 1
+ }
+
+ /**
+ * Return an iterator of the map in sorted order. This provides a way to sort the map without
+ * using additional memory, at the expense of destroying the validity of the map.
+ */
+ def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
+ destroyed = true
+ // Pack KV pairs into the front of the underlying array
+ var keyIndex, newIndex = 0
+ while (keyIndex < capacity) {
+ if (data(2 * keyIndex) != null) {
+ data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+ newIndex += 1
+ }
+ keyIndex += 1
+ }
+ assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
+
+ // Sort by the given ordering
+ val rawOrdering = new Comparator[AnyRef] {
+ def compare(x: AnyRef, y: AnyRef): Int = {
+ cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+ }
+ }
+ Arrays.sort(data, 0, newIndex, rawOrdering)
+
+ new Iterator[(K, V)] {
+ var i = 0
+ var nullValueReady = haveNullValue
+ def hasNext: Boolean = (i < newIndex || nullValueReady)
+ def next(): (K, V) = {
+ if (nullValueReady) {
+ nullValueReady = false
+ (null.asInstanceOf[K], nullValue)
+ } else {
+ val item = data(i).asInstanceOf[(K, V)]
+ i += 1
+ item
+ }
+ }
+ }
+ }
+
+ /**
+ * Return whether the next insert will cause the map to grow
+ */
+ def atGrowThreshold: Boolean = curSize == growThreshold
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/91a56360/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
[6/6] git commit: Merge pull request #395 from
hsaputra/remove_simpleredundantreturn_scala
Posted by pw...@apache.org.
Merge pull request #395 from hsaputra/remove_simpleredundantreturn_scala
Remove simple redundant return statements for Scala methods/functions
Remove simple redundant return statements for Scala methods/functions:
-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
-) Add small changes to making var to val if possible and remove () for simple get
This hopefully makes the review simpler =)
Pass compile and tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0ab505a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0ab505a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0ab505a2
Branch: refs/heads/master
Commit: 0ab505a29e21b5a03928e0bbd3950f6f8e08ae32
Parents: 405bfe8 5a8abfb
Author: Patrick Wendell <pw...@gmail.com>
Authored: Sun Jan 12 21:31:04 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Jan 12 21:31:04 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Accumulators.scala | 2 +-
.../scala/org/apache/spark/CacheManager.scala | 4 +--
.../scala/org/apache/spark/HttpFileServer.scala | 6 ++--
.../main/scala/org/apache/spark/Logging.scala | 2 +-
.../org/apache/spark/MapOutputTracker.scala | 2 +-
.../scala/org/apache/spark/Partitioner.scala | 4 +--
.../scala/org/apache/spark/SparkContext.scala | 11 +++++---
.../org/apache/spark/SparkHadoopWriter.scala | 15 +++++-----
.../org/apache/spark/api/python/PythonRDD.scala | 3 +-
.../spark/broadcast/TorrentBroadcast.scala | 6 ++--
.../apache/spark/network/BufferMessage.scala | 2 +-
.../org/apache/spark/network/Connection.scala | 6 ++--
.../org/apache/spark/network/Message.scala | 6 ++--
.../spark/network/netty/ShuffleSender.scala | 2 +-
.../org/apache/spark/rdd/CoalescedRDD.scala | 10 +++----
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +++---
.../scala/org/apache/spark/rdd/PipedRDD.scala | 2 +-
.../main/scala/org/apache/spark/rdd/RDD.scala | 4 +--
.../spark/scheduler/InputFormatInfo.scala | 8 +++---
.../scala/org/apache/spark/scheduler/Pool.scala | 8 +++---
.../spark/scheduler/SchedulingAlgorithm.scala | 11 ++++----
.../spark/scheduler/SparkListenerBus.scala | 2 +-
.../org/apache/spark/scheduler/Stage.scala | 2 +-
.../org/apache/spark/scheduler/TaskResult.scala | 2 +-
.../spark/scheduler/TaskResultGetter.scala | 2 +-
.../apache/spark/scheduler/TaskSetManager.scala | 14 +++++-----
.../mesos/CoarseMesosSchedulerBackend.scala | 2 +-
.../cluster/mesos/MesosSchedulerBackend.scala | 6 ++--
.../org/apache/spark/storage/BlockManager.scala | 2 +-
.../spark/storage/BlockManagerWorker.scala | 20 +++++++-------
.../org/apache/spark/storage/BlockMessage.scala | 2 +-
.../spark/storage/BlockMessageArray.scala | 2 +-
.../org/apache/spark/storage/MemoryStore.scala | 2 +-
.../org/apache/spark/storage/StorageLevel.scala | 2 +-
.../org/apache/spark/util/ClosureCleaner.scala | 10 +++----
.../org/apache/spark/util/SizeEstimator.scala | 10 +++----
.../scala/org/apache/spark/util/Utils.scala | 19 ++++++-------
.../scala/org/apache/spark/util/Vector.scala | 12 ++++----
.../spark/util/collection/AppendOnlyMap.scala | 2 +-
.../spark/scheduler/ClusterSchedulerSuite.scala | 9 +++---
.../spark/scheduler/DAGSchedulerSuite.scala | 3 +-
.../apache/spark/scheduler/JobLoggerSuite.scala | 7 ++---
.../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++-----
.../org/apache/spark/examples/LocalALS.scala | 8 +++---
.../org/apache/spark/examples/LocalFileLR.scala | 2 +-
.../org/apache/spark/examples/LocalKMeans.scala | 2 +-
.../org/apache/spark/examples/SparkALS.scala | 6 ++--
.../org/apache/spark/examples/SparkHdfsLR.scala | 2 +-
.../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----
.../clickstream/PageViewGenerator.scala | 2 +-
.../spark/mllib/api/python/PythonMLLibAPI.scala | 29 ++++++++++----------
.../org/apache/spark/streaming/Checkpoint.scala | 2 +-
.../streaming/dstream/FileInputDStream.scala | 2 +-
.../spark/streaming/dstream/StateDStream.scala | 8 +++---
.../scheduler/StreamingListenerBus.scala | 2 +-
.../org/apache/spark/streaming/util/Clock.scala | 4 +--
.../spark/streaming/util/RawTextHelper.scala | 2 +-
.../org/apache/spark/deploy/yarn/Client.scala | 7 +++--
.../spark/deploy/yarn/WorkerLauncher.scala | 8 +++---
.../spark/deploy/yarn/WorkerRunnable.scala | 7 ++---
.../yarn/ClientDistributedCacheManager.scala | 10 +++----
.../ClientDistributedCacheManagerSuite.scala | 2 +-
.../org/apache/spark/deploy/yarn/Client.scala | 3 +-
63 files changed, 194 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ab505a2/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fc63788,e551c11..17b1328
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@@ -93,8 -88,6 +93,8 @@@ private[spark] class SparkListenerBus e
* add overhead in the general case. */
Thread.sleep(10)
}
- return true
+ true
}
+
+ def stop(): Unit = post(SparkListenerShutdown)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ab505a2/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0ab505a2/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 6e6e22e,73dc520..3063cf1
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@@ -83,8 -76,6 +83,8 @@@ private[spark] class StreamingListenerB
* add overhead in the general case. */
Thread.sleep(10)
}
- return true
+ true
}
+
+ def stop(): Unit = post(StreamingListenerShutdown)
}
[2/6] git commit: Remove simple redundant return statement for Scala
methods/functions:
Posted by pw...@apache.org.
Remove simple redundant return statement for Scala methods/functions:
-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/93a65e5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/93a65e5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/93a65e5f
Branch: refs/heads/master
Commit: 93a65e5fde64ffed3dbd2a050c1007e077ecd004
Parents: 26cdb5f
Author: Henry Saputra <hs...@apache.org>
Authored: Sun Jan 12 10:30:04 2014 -0800
Committer: Henry Saputra <hs...@apache.org>
Committed: Sun Jan 12 10:30:04 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Accumulators.scala | 2 +-
.../scala/org/apache/spark/CacheManager.scala | 4 +--
.../scala/org/apache/spark/HttpFileServer.scala | 6 ++--
.../main/scala/org/apache/spark/Logging.scala | 2 +-
.../org/apache/spark/MapOutputTracker.scala | 4 +--
.../scala/org/apache/spark/Partitioner.scala | 4 +--
.../scala/org/apache/spark/SparkContext.scala | 11 +++++---
.../org/apache/spark/SparkHadoopWriter.scala | 13 ++++-----
.../org/apache/spark/api/python/PythonRDD.scala | 2 +-
.../spark/broadcast/TorrentBroadcast.scala | 6 ++--
.../apache/spark/network/BufferMessage.scala | 2 +-
.../org/apache/spark/network/Connection.scala | 6 ++--
.../org/apache/spark/network/Message.scala | 6 ++--
.../spark/network/netty/ShuffleSender.scala | 2 +-
.../org/apache/spark/rdd/CoalescedRDD.scala | 4 +--
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +++---
.../scala/org/apache/spark/rdd/PipedRDD.scala | 2 +-
.../main/scala/org/apache/spark/rdd/RDD.scala | 4 +--
.../spark/scheduler/InputFormatInfo.scala | 8 +++---
.../scala/org/apache/spark/scheduler/Pool.scala | 8 +++---
.../spark/scheduler/SchedulingAlgorithm.scala | 11 ++++----
.../spark/scheduler/SparkListenerBus.scala | 2 +-
.../org/apache/spark/scheduler/Stage.scala | 2 +-
.../org/apache/spark/scheduler/TaskResult.scala | 2 +-
.../spark/scheduler/TaskResultGetter.scala | 2 +-
.../apache/spark/scheduler/TaskSetManager.scala | 14 +++++-----
.../mesos/CoarseMesosSchedulerBackend.scala | 2 +-
.../cluster/mesos/MesosSchedulerBackend.scala | 6 ++--
.../org/apache/spark/storage/BlockManager.scala | 2 +-
.../spark/storage/BlockManagerWorker.scala | 14 +++++-----
.../org/apache/spark/storage/BlockMessage.scala | 2 +-
.../spark/storage/BlockMessageArray.scala | 2 +-
.../org/apache/spark/storage/MemoryStore.scala | 2 +-
.../org/apache/spark/storage/StorageLevel.scala | 2 +-
.../org/apache/spark/util/AppendOnlyMap.scala | 2 +-
.../org/apache/spark/util/ClosureCleaner.scala | 10 +++----
.../org/apache/spark/util/SizeEstimator.scala | 10 +++----
.../scala/org/apache/spark/util/Utils.scala | 19 ++++++-------
.../scala/org/apache/spark/util/Vector.scala | 12 ++++----
.../spark/scheduler/ClusterSchedulerSuite.scala | 9 +++---
.../spark/scheduler/DAGSchedulerSuite.scala | 2 +-
.../apache/spark/scheduler/JobLoggerSuite.scala | 2 +-
.../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++-----
.../org/apache/spark/examples/LocalALS.scala | 8 +++---
.../org/apache/spark/examples/LocalFileLR.scala | 2 +-
.../org/apache/spark/examples/LocalKMeans.scala | 2 +-
.../org/apache/spark/examples/SparkALS.scala | 6 ++--
.../org/apache/spark/examples/SparkHdfsLR.scala | 2 +-
.../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----
.../clickstream/PageViewGenerator.scala | 2 +-
.../spark/mllib/api/python/PythonMLLibAPI.scala | 29 ++++++++++----------
.../org/apache/spark/streaming/Checkpoint.scala | 2 +-
.../streaming/dstream/FileInputDStream.scala | 2 +-
.../spark/streaming/dstream/StateDStream.scala | 8 +++---
.../scheduler/StreamingListenerBus.scala | 2 +-
.../org/apache/spark/streaming/util/Clock.scala | 4 +--
.../spark/streaming/util/RawTextHelper.scala | 2 +-
.../org/apache/spark/deploy/yarn/Client.scala | 9 +++---
.../spark/deploy/yarn/WorkerLauncher.scala | 8 +++---
.../spark/deploy/yarn/WorkerRunnable.scala | 7 ++---
.../yarn/ClientDistributedCacheManager.scala | 10 +++----
.../ClientDistributedCacheManagerSuite.scala | 2 +-
.../org/apache/spark/deploy/yarn/Client.scala | 5 ++--
63 files changed, 187 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 5f73d23..e89ac28 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -218,7 +218,7 @@ private object Accumulators {
def newId: Long = synchronized {
lastId += 1
- return lastId
+ lastId
}
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 519ecde..8e5dd8a 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
- return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+ new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
// Mark the split as loading (unless someone else marks it first)
@@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
- return elements.iterator.asInstanceOf[Iterator[T]]
+ elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
loading.remove(key)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index ad1ee20..a885898 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging {
def addFile(file: File) : String = {
addFileToDir(file, fileDir)
- return serverUri + "/files/" + file.getName
+ serverUri + "/files/" + file.getName
}
def addJar(file: File) : String = {
addFileToDir(file, jarDir)
- return serverUri + "/jars/" + file.getName
+ serverUri + "/jars/" + file.getName
}
def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
- return dir + "/" + file.getName
+ dir + "/" + file.getName
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 4a34989..9063cae 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -41,7 +41,7 @@ trait Logging {
}
log_ = LoggerFactory.getLogger(className)
}
- return log_
+ log_
}
// Log methods that take only a String
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 77b8ca1..57bdf22 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
}
- else{
+ else {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
@@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {
- // Cache a serialized version of the output statuses for each shuffle to send them out faster
+ // Cache a serialized version of the output statuses for each shuffle to send them out faster return
private var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 9b043f0..fc0a749 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -53,9 +53,9 @@ object Partitioner {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
- return new HashPartitioner(rdd.context.defaultParallelism)
+ new HashPartitioner(rdd.context.defaultParallelism)
} else {
- return new HashPartitioner(bySize.head.partitions.size)
+ new HashPartitioner(bySize.head.partitions.size)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f91392b..3d82bfc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -756,8 +756,11 @@ class SparkContext(
private[spark] def getCallSite(): String = {
val callSite = getLocalProperty("externalCallSite")
- if (callSite == null) return Utils.formatSparkCallSite
- callSite
+ if (callSite == null) {
+ Utils.formatSparkCallSite
+ } else {
+ callSite
+ }
}
/**
@@ -907,7 +910,7 @@ class SparkContext(
*/
private[spark] def clean[F <: AnyRef](f: F): F = {
ClosureCleaner.clean(f)
- return f
+ f
}
/**
@@ -919,7 +922,7 @@ class SparkContext(
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
- fs.getFileStatus(path).getPath().toString
+ fs.getFileStatus(path).getPath.toString
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 618d950..bba873a 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
format = conf.value.getOutputFormat()
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
}
- return format
+ format
}
private def getOutputCommitter(): OutputCommitter = {
if (committer == null) {
committer = conf.value.getOutputCommitter
}
- return committer
+ committer
}
private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
- return jobContext
+ jobContext
}
private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
- return taskContext
+ taskContext
}
private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
@@ -182,7 +182,7 @@ object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
- return new JobID(jobtrackerID, id)
+ new JobID(jobtrackerID, id)
}
def createPathFromString(path: String, conf: JobConf): Path = {
@@ -194,7 +194,6 @@ object SparkHadoopWriter {
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
- outputPath = outputPath.makeQualified(fs)
- return outputPath
+ outputPath.makeQualified(fs)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 40c519b..8830de7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
- return new Iterator[Array[Byte]] {
+ new Iterator[Array[Byte]] {
def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index fdf92ec..1d295c6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -203,16 +203,16 @@ extends Logging {
}
bais.close()
- var tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
+ val tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
tInfo.hasBlocks = blockNum
- return tInfo
+ tInfo
}
def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
totalBytes: Int,
totalBlocks: Int): T = {
- var retByteArray = new Array[Byte](totalBytes)
+ val retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
index f736bb3..fb4c659 100644
--- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
@@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
throw new Exception("Max chunk size is " + maxChunkSize)
}
- if (size == 0 && gotChunkForSendingOnce == false) {
+ if (size == 0 && !gotChunkForSendingOnce) {
val newChunk = new MessageChunk(
new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null)
gotChunkForSendingOnce = true
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 95cb020..cba8477 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
// Is highly unlikely unless there was an unclean close of socket, etc
registerInterest()
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
- return true
+ true
} catch {
case e: Exception => {
logWarning("Error finishing connection to " + address, e)
@@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
}
// should not happen - to keep scala compiler happy
- return true
+ true
}
// This is a hack to determine if remote socket was closed or not.
@@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
}
}
// should not happen - to keep scala compiler happy
- return true
+ true
}
def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala
index f2ecc6d..2612884 100644
--- a/core/src/main/scala/org/apache/spark/network/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/Message.scala
@@ -61,7 +61,7 @@ private[spark] object Message {
if (dataBuffers.exists(_ == null)) {
throw new Exception("Attempting to create buffer message with null buffer")
}
- return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
+ new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
}
def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage =
@@ -69,9 +69,9 @@ private[spark] object Message {
def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = {
if (dataBuffer == null) {
- return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
+ createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
} else {
- return createBufferMessage(Array(dataBuffer), ackId)
+ createBufferMessage(Array(dataBuffer), ackId)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 546d921..44204a8 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
val file = new File(subDir, blockId.name)
- return new FileSegment(file, 0, file.length())
+ new FileSegment(file, 0, file.length())
}
}
val sender = new ShuffleSender(port, pResovler)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 98da357..a5394a2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -296,9 +296,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val prefPartActual = prefPart.get
if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
- return minPowerOfTwo // prefer balance over locality
+ minPowerOfTwo // prefer balance over locality
else {
- return prefPartActual // prefer locality over balance
+ prefPartActual // prefer locality over balance
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 53f77a3..20db7db 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -99,11 +99,11 @@ class HadoopRDD[K, V](
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
- return conf.asInstanceOf[JobConf]
+ conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
- return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
@@ -111,7 +111,7 @@ class HadoopRDD[K, V](
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
- return newJobConf
+ newJobConf
}
}
@@ -127,7 +127,7 @@ class HadoopRDD[K, V](
newInputFormat.asInstanceOf[Configurable].setConf(conf)
}
HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
- return newInputFormat
+ newInputFormat
}
override def getPartitions: Array[Partition] = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 1dbbe39..d4f396a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag](
// Return an iterator that read lines from the process's stdout
val lines = Source.fromInputStream(proc.getInputStream).getLines
- return new Iterator[String] {
+ new Iterator[String] {
def next() = lines.next()
def hasNext = {
if (lines.hasNext) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f9dc12e..edd4f38 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag](
val entry = iter.next()
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
}
- return m1
+ m1
}
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
@@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag](
partsScanned += numPartsToTry
}
- return buf.toArray
+ buf.toArray
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 90eb8a7..cc10cc0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
}
- return retval.toSet
+ retval.toSet
}
// This method does not expect failures, since validate has already passed ...
@@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
)
- return retval.toSet
+ retval.toSet
}
private def findPreferredLocations(): Set[SplitInfo] = {
logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
- return prefLocsFromMapreduceInputFormat()
+ prefLocsFromMapreduceInputFormat()
}
else {
assert(mapredInputFormat)
- return prefLocsFromMapredInputFormat()
+ prefLocsFromMapredInputFormat()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 1791242..4bc13c2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -75,12 +75,12 @@ private[spark] class Pool(
return schedulableNameToSchedulable(schedulableName)
}
for (schedulable <- schedulableQueue) {
- var sched = schedulable.getSchedulableByName(schedulableName)
+ val sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
}
}
- return null
+ null
}
override def executorLost(executorId: String, host: String) {
@@ -92,7 +92,7 @@ private[spark] class Pool(
for (schedulable <- schedulableQueue) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
- return shouldRevive
+ shouldRevive
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
@@ -101,7 +101,7 @@ private[spark] class Pool(
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
}
- return sortedTaskSetQueue
+ sortedTaskSetQueue
}
def increaseRunningTasks(taskNum: Int) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index 3418640..5e62c84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -37,9 +37,9 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
- return true
+ true
} else {
- return false
+ false
}
}
}
@@ -56,7 +56,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
- var res:Boolean = true
var compare:Int = 0
if (s1Needy && !s2Needy) {
@@ -70,11 +69,11 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
}
if (compare < 0) {
- return true
+ true
} else if (compare > 0) {
- return false
+ false
} else {
- return s1.name < s2.name
+ s1.name < s2.name
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e7defd7..e551c11 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -88,6 +88,6 @@ private[spark] class SparkListenerBus() extends Logging {
* add overhead in the general case. */
Thread.sleep(10)
}
- return true
+ true
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 7cb3fe4..c60e989 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -96,7 +96,7 @@ private[spark] class Stage(
def newAttemptId(): Int = {
val id = nextAttemptId
nextAttemptId += 1
- return id
+ id
}
val name = callSite.getOrElse(rdd.origin)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index e80cc6b..9d3e615 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -74,6 +74,6 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
def value(): T = {
val resultSer = SparkEnv.get.serializer.newInstance()
- return resultSer.deserialize(valueBytes)
+ resultSer.deserialize(valueBytes)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index c52d617..35e9544 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -37,7 +37,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
- return sparkEnv.closureSerializer.newInstance()
+ sparkEnv.closureSerializer.newInstance()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a10e539..fc0ee07 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -228,7 +228,7 @@ private[spark] class TaskSetManager(
return Some(index)
}
}
- return None
+ None
}
/** Check whether a task is currently running an attempt on a given host */
@@ -291,7 +291,7 @@ private[spark] class TaskSetManager(
}
}
- return None
+ None
}
/**
@@ -332,7 +332,7 @@ private[spark] class TaskSetManager(
}
// Finally, if all else has failed, find a speculative task
- return findSpeculativeTask(execId, host, locality)
+ findSpeculativeTask(execId, host, locality)
}
/**
@@ -387,7 +387,7 @@ private[spark] class TaskSetManager(
case _ =>
}
}
- return None
+ None
}
/**
@@ -584,7 +584,7 @@ private[spark] class TaskSetManager(
}
override def getSchedulableByName(name: String): Schedulable = {
- return null
+ null
}
override def addSchedulable(schedulable: Schedulable) {}
@@ -594,7 +594,7 @@ private[spark] class TaskSetManager(
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
sortedTaskSetQueue += this
- return sortedTaskSetQueue
+ sortedTaskSetQueue
}
/** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
@@ -669,7 +669,7 @@ private[spark] class TaskSetManager(
}
}
}
- return foundTasks
+ foundTasks
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index e16d60c..c27049b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -140,7 +140,7 @@ private[spark] class CoarseMesosSchedulerBackend(
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
- return command.build()
+ command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index b428c82..4978148 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -141,13 +141,13 @@ private[spark] class MesosSchedulerBackend(
// Serialize the map as an array of (String, String) pairs
execArgs = Utils.serialize(props.toArray)
}
- return execArgs
+ execArgs
}
private def setClassLoader(): ClassLoader = {
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
- return oldClassLoader
+ oldClassLoader
}
private def restoreClassLoader(oldClassLoader: ClassLoader) {
@@ -255,7 +255,7 @@ private[spark] class MesosSchedulerBackend(
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(1).build())
.build()
- return MesosTaskInfo.newBuilder()
+ MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c56e2ca..a716b1d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -412,7 +412,7 @@ private[spark] class BlockManager(
logDebug("The value of block " + blockId + " is null")
}
logDebug("Block " + blockId + " not found")
- return None
+ None
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 21f0036..a36abe0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -42,7 +42,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
logDebug("Parsed as a block message array")
val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
- return Some(new BlockMessageArray(responseMessages).toBufferMessage)
+ Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {
case e: Exception => logError("Exception handling buffer message", e)
return None
@@ -50,7 +50,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
}
case otherMessage: Any => {
logError("Unknown type message received: " + otherMessage)
- return None
+ None
}
}
}
@@ -61,7 +61,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
logDebug("Received [" + pB + "]")
putBlock(pB.id, pB.data, pB.level)
- return None
+ None
}
case BlockMessage.TYPE_GET_BLOCK => {
val gB = new GetBlock(blockMessage.getId)
@@ -70,9 +70,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
if (buffer == null) {
return None
}
- return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
+ Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
}
- case _ => return None
+ case _ => None
}
}
@@ -93,7 +93,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
}
logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs)
+ " and got buffer " + buffer)
- return buffer
+ buffer
}
}
@@ -132,6 +132,6 @@ private[spark] object BlockManagerWorker extends Logging {
}
case None => logDebug("No response message received"); return null
}
- return null
+ null
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
index 80dcb5a..fbafcf7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala
@@ -154,7 +154,7 @@ private[spark] class BlockMessage() {
println()
*/
val finishTime = System.currentTimeMillis
- return Message.createBufferMessage(buffers)
+ Message.createBufferMessage(buffers)
}
override def toString: String = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index a06f50a..5932936 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -96,7 +96,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
println()
println()
*/
- return Message.createBufferMessage(buffers)
+ Message.createBufferMessage(buffers)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 05f676c..27f057b 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -245,7 +245,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false
}
}
- return true
+ true
}
override def contains(blockId: BlockId): Boolean = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index b5596df..0f84810 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -74,7 +74,7 @@ class StorageLevel private(
if (deserialized_) {
ret |= 1
}
- return ret
+ ret
}
override def writeExternal(out: ObjectOutput) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index 8bb4ee3..edfa58b 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -67,7 +67,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
i += 1
}
}
- return null.asInstanceOf[V]
+ null.asInstanceOf[V]
}
/** Set the value for a key */
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 7108595..1df6b87 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -61,7 +61,7 @@ private[spark] object ClosureCleaner extends Logging {
return f.getType :: Nil // Stop at the first $outer that is not a closure
}
}
- return Nil
+ Nil
}
// Get a list of the outer objects for a given closure object.
@@ -74,7 +74,7 @@ private[spark] object ClosureCleaner extends Logging {
return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
}
}
- return Nil
+ Nil
}
private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
@@ -174,7 +174,7 @@ private[spark] object ClosureCleaner extends Logging {
field.setAccessible(true)
field.set(obj, outer)
}
- return obj
+ obj
}
}
}
@@ -182,7 +182,7 @@ private[spark] object ClosureCleaner extends Logging {
private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
+ new MethodVisitor(ASM4) {
override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
if (op == GETFIELD) {
for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
@@ -215,7 +215,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
+ new MethodVisitor(ASM4) {
override def visitMethodInsn(op: Int, owner: String, name: String,
desc: String) {
val argTypes = Type.getArgumentTypes(desc)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index bddb3bb..3cf9489 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -108,7 +108,7 @@ private[spark] object SizeEstimator extends Logging {
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, hotSpotMBeanClass)
// TODO: We could use reflection on the VMOption returned ?
- return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
+ getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
} catch {
case e: Exception => {
// Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
@@ -141,7 +141,7 @@ private[spark] object SizeEstimator extends Logging {
def dequeue(): AnyRef = {
val elem = stack.last
stack.trimEnd(1)
- return elem
+ elem
}
}
@@ -162,7 +162,7 @@ private[spark] object SizeEstimator extends Logging {
while (!state.isFinished) {
visitSingleObject(state.dequeue(), state)
}
- return state.size
+ state.size
}
private def visitSingleObject(obj: AnyRef, state: SearchState) {
@@ -276,11 +276,11 @@ private[spark] object SizeEstimator extends Logging {
// Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields)
classInfos.put(cls, newInfo)
- return newInfo
+ newInfo
}
private def alignSize(size: Long): Long = {
val rem = size % ALIGN_SIZE
- return if (rem == 0) size else (size + ALIGN_SIZE - rem)
+ if (rem == 0) size else (size + ALIGN_SIZE - rem)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f12531..f80ed29 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -49,14 +49,14 @@ private[spark] object Utils extends Logging {
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
oos.close()
- return bos.toByteArray
+ bos.toByteArray
}
/** Deserialize an object using Java serialization */
def deserialize[T](bytes: Array[Byte]): T = {
val bis = new ByteArrayInputStream(bytes)
val ois = new ObjectInputStream(bis)
- return ois.readObject.asInstanceOf[T]
+ ois.readObject.asInstanceOf[T]
}
/** Deserialize an object using Java serialization and the given ClassLoader */
@@ -66,7 +66,7 @@ private[spark] object Utils extends Logging {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
- return ois.readObject.asInstanceOf[T]
+ ois.readObject.asInstanceOf[T]
}
/** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
@@ -144,7 +144,7 @@ private[spark] object Utils extends Logging {
i += 1
}
}
- return buf
+ buf
}
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
@@ -428,7 +428,7 @@ private[spark] object Utils extends Logging {
def parseHostPort(hostPort: String): (String, Int) = {
{
// Check cache first.
- var cached = hostPortParseResults.get(hostPort)
+ val cached = hostPortParseResults.get(hostPort)
if (cached != null) return cached
}
@@ -731,7 +731,7 @@ private[spark] object Utils extends Logging {
} catch {
case ise: IllegalStateException => return true
}
- return false
+ false
}
def isSpace(c: Char): Boolean = {
@@ -748,7 +748,7 @@ private[spark] object Utils extends Logging {
var inWord = false
var inSingleQuote = false
var inDoubleQuote = false
- var curWord = new StringBuilder
+ val curWord = new StringBuilder
def endWord() {
buf += curWord.toString
curWord.clear()
@@ -794,7 +794,7 @@ private[spark] object Utils extends Logging {
if (inWord || inDoubleQuote || inSingleQuote) {
endWord()
}
- return buf
+ buf
}
/* Calculates 'x' modulo 'mod', takes to consideration sign of x,
@@ -822,8 +822,7 @@ private[spark] object Utils extends Logging {
/** Returns a copy of the system properties that is thread-safe to iterator over. */
def getSystemProperties(): Map[String, String] = {
- return System.getProperties().clone()
- .asInstanceOf[java.util.Properties].toMap[String, String]
+ System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String]
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index fe710c5..094edcd 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -25,7 +25,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
def + (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- return Vector(length, i => this(i) + other(i))
+ Vector(length, i => this(i) + other(i))
}
def add(other: Vector) = this + other
@@ -33,7 +33,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
def - (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- return Vector(length, i => this(i) - other(i))
+ Vector(length, i => this(i) - other(i))
}
def subtract(other: Vector) = this - other
@@ -47,7 +47,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
ans += this(i) * other(i)
i += 1
}
- return ans
+ ans
}
/**
@@ -67,7 +67,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
ans += (this(i) + plus(i)) * other(i)
i += 1
}
- return ans
+ ans
}
def += (other: Vector): Vector = {
@@ -102,7 +102,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
ans += (this(i) - other(i)) * (this(i) - other(i))
i += 1
}
- return ans
+ ans
}
def dist(other: Vector): Double = math.sqrt(squaredDist(other))
@@ -117,7 +117,7 @@ object Vector {
def apply(length: Int, initializer: Int => Double): Vector = {
val elements: Array[Double] = Array.tabulate(length)(initializer)
- return new Vector(elements)
+ new Vector(elements)
}
def zeros(length: Int) = new Vector(new Array[Double](length))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
index 7bf2020..235d317 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala
@@ -64,7 +64,7 @@ class FakeTaskSetManager(
}
override def getSchedulableByName(name: String): Schedulable = {
- return null
+ null
}
override def executorLost(executorId: String, host: String): Unit = {
@@ -79,13 +79,14 @@ class FakeTaskSetManager(
{
if (tasksSuccessful + runningTasks < numTasks) {
increaseRunningTasks(1)
- return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+ Some(new TaskDescription(0, execId, "task 0:0", 0, null))
+ } else {
+ None
}
- return None
}
override def checkSpeculatableTasks(): Boolean = {
- return true
+ true
}
def taskFinished() {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2aa259d..14f89d5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
locations: Seq[Seq[String]] = Nil
): MyRDD = {
val maxPartition = numPartitions - 1
- return new MyRDD(sc, dependencies) {
+ new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 5cc48ee..3880e68 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -47,7 +47,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
dependencies: List[Dependency[_]]
): MyRDD = {
val maxPartition = numPartitions - 1
- return new MyRDD(sc, dependencies) {
+ new MyRDD(sc, dependencies) {
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
override def getPartitions = (0 to maxPartition).map(i => new Partition {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index 0ed366f..de4871d 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -61,8 +61,8 @@ class NonSerializable {}
object TestObject {
def run(): Int = {
var nonSer = new NonSerializable
- var x = 5
- return withSpark(new SparkContext("local", "test")) { sc =>
+ val x = 5
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
}
@@ -76,7 +76,7 @@ class TestClass extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + getX).reduce(_ + _)
}
@@ -88,7 +88,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + getX).reduce(_ + _)
}
@@ -103,7 +103,7 @@ class TestClassWithoutFieldAccess {
def run(): Int = {
var nonSer2 = new NonSerializable
var x = 5
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
}
@@ -115,7 +115,7 @@ object TestObjectWithNesting {
def run(): Int = {
var nonSer = new NonSerializable
var answer = 0
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
var y = 1
for (i <- 1 to 4) {
@@ -134,7 +134,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
def run(): Int = {
var nonSer = new NonSerializable
var answer = 0
- return withSpark(new SparkContext("local", "test")) { sc =>
+ withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
for (i <- 1 to 4) {
var nonSer2 = new NonSerializable
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index 83db8b9..c8ecbb8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -43,7 +43,7 @@ object LocalALS {
def generateR(): DoubleMatrix2D = {
val mh = factory2D.random(M, F)
val uh = factory2D.random(U, F)
- return algebra.mult(mh, algebra.transpose(uh))
+ algebra.mult(mh, algebra.transpose(uh))
}
def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
@@ -56,7 +56,7 @@ object LocalALS {
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
- return sqrt(sumSqs / (M * U))
+ sqrt(sumSqs / (M * U))
}
def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
@@ -80,7 +80,7 @@ object LocalALS {
val ch = new CholeskyDecomposition(XtX)
val Xty2D = factory2D.make(Xty.toArray, F)
val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
+ solved2D.viewColumn(0)
}
def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
@@ -104,7 +104,7 @@ object LocalALS {
val ch = new CholeskyDecomposition(XtX)
val Xty2D = factory2D.make(Xty.toArray, F)
val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
+ solved2D.viewColumn(0)
}
def main(args: Array[String]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index fb130ea..9ab5f5a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -28,7 +28,7 @@ object LocalFileLR {
def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
- return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
}
def main(args: Array[String]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index f90ea35..a730464 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -55,7 +55,7 @@ object LocalKMeans {
}
}
- return bestIndex
+ bestIndex
}
def main(args: Array[String]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index 30c86d8..17bafc2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -44,7 +44,7 @@ object SparkALS {
def generateR(): DoubleMatrix2D = {
val mh = factory2D.random(M, F)
val uh = factory2D.random(U, F)
- return algebra.mult(mh, algebra.transpose(uh))
+ algebra.mult(mh, algebra.transpose(uh))
}
def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
@@ -57,7 +57,7 @@ object SparkALS {
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
- return sqrt(sumSqs / (M * U))
+ sqrt(sumSqs / (M * U))
}
def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
@@ -83,7 +83,7 @@ object SparkALS {
val ch = new CholeskyDecomposition(XtX)
val Xty2D = factory2D.make(Xty.toArray, F)
val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
+ solved2D.viewColumn(0)
}
def main(args: Array[String]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index ff72532..3981906 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -43,7 +43,7 @@ object SparkHdfsLR {
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
- return DataPoint(new Vector(x), y)
+ DataPoint(new Vector(x), y)
}
def main(args: Array[String]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index 8c99025..9fe2465 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -30,7 +30,7 @@ object SparkKMeans {
val rand = new Random(42)
def parseVector(line: String): Vector = {
- return new Vector(line.split(' ').map(_.toDouble))
+ new Vector(line.split(' ').map(_.toDouble))
}
def closestPoint(p: Vector, centers: Array[Vector]): Int = {
@@ -46,7 +46,7 @@ object SparkKMeans {
}
}
- return bestIndex
+ bestIndex
}
def main(args: Array[String]) {
@@ -61,15 +61,15 @@ object SparkKMeans {
val K = args(2).toInt
val convergeDist = args(3).toDouble
- var kPoints = data.takeSample(false, K, 42).toArray
+ val kPoints = data.takeSample(withReplacement = false, K, 42).toArray
var tempDist = 1.0
while(tempDist > convergeDist) {
- var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+ val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
- var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
+ val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
- var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
+ val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist = 0.0
for (i <- 0 until K) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index 4fe57de..a260098 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -65,7 +65,7 @@ object PageViewGenerator {
return item
}
}
- return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+ inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
}
def getNextClickEvent() : String = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 2d86233..c972a71 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -48,7 +48,7 @@ class PythonMLLibAPI extends Serializable {
val db = bb.asDoubleBuffer()
val ans = new Array[Double](length.toInt)
db.get(ans)
- return ans
+ ans
}
private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
@@ -60,7 +60,7 @@ class PythonMLLibAPI extends Serializable {
bb.putLong(len)
val db = bb.asDoubleBuffer()
db.put(doubles)
- return bytes
+ bytes
}
private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
@@ -86,7 +86,7 @@ class PythonMLLibAPI extends Serializable {
ans(i) = new Array[Double](cols.toInt)
db.get(ans(i))
}
- return ans
+ ans
}
private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
@@ -102,11 +102,10 @@ class PythonMLLibAPI extends Serializable {
bb.putLong(rows)
bb.putLong(cols)
val db = bb.asDoubleBuffer()
- var i = 0
for (i <- 0 until rows) {
db.put(doubles(i))
}
- return bytes
+ bytes
}
private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
@@ -121,7 +120,7 @@ class PythonMLLibAPI extends Serializable {
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights))
ret.add(model.intercept: java.lang.Double)
- return ret
+ ret
}
/**
@@ -130,7 +129,7 @@ class PythonMLLibAPI extends Serializable {
def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
+ trainRegressionModel((data, initialWeights) =>
LinearRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
@@ -142,7 +141,7 @@ class PythonMLLibAPI extends Serializable {
def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
+ trainRegressionModel((data, initialWeights) =>
LassoWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
@@ -154,7 +153,7 @@ class PythonMLLibAPI extends Serializable {
def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
+ trainRegressionModel((data, initialWeights) =>
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
@@ -166,7 +165,7 @@ class PythonMLLibAPI extends Serializable {
def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
+ trainRegressionModel((data, initialWeights) =>
SVMWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
@@ -178,7 +177,7 @@ class PythonMLLibAPI extends Serializable {
def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
+ trainRegressionModel((data, initialWeights) =>
LogisticRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
dataBytesJRDD, initialWeightsBA)
@@ -194,7 +193,7 @@ class PythonMLLibAPI extends Serializable {
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleMatrix(model.clusterCenters))
- return ret
+ ret
}
/** Unpack a Rating object from an array of bytes */
@@ -204,7 +203,7 @@ class PythonMLLibAPI extends Serializable {
val user = bb.getInt()
val product = bb.getInt()
val rating = bb.getDouble()
- return new Rating(user, product, rating)
+ new Rating(user, product, rating)
}
/** Unpack a tuple of Ints from an array of bytes */
@@ -245,7 +244,7 @@ class PythonMLLibAPI extends Serializable {
def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
- return ALS.train(ratings, rank, iterations, lambda, blocks)
+ ALS.train(ratings, rank, iterations, lambda, blocks)
}
/**
@@ -257,6 +256,6 @@ class PythonMLLibAPI extends Serializable {
def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
- return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+ ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index ca0115f..ebfb8db 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -203,6 +203,6 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
} catch {
case e: Exception =>
}
- return super.resolveClass(desc)
+ super.resolveClass(desc)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fb9eda8..a7ba233 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -219,7 +219,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
reset()
return false
}
- return true
+ true
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index e0ff3cc..b34ba7b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -65,7 +65,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
//logDebug("Generating state RDD for time " + validTime)
- return Some(stateRDD)
+ Some(stateRDD)
}
case None => { // If parent RDD does not exist
@@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
updateFuncLocal(i)
}
val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
- return Some(stateRDD)
+ Some(stateRDD)
}
}
}
@@ -98,11 +98,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val groupedRDD = parentRDD.groupByKey(partitioner)
val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
//logDebug("Generating state RDD for time " + validTime + " (first)")
- return Some(sessionRDD)
+ Some(sessionRDD)
}
case None => { // If parent RDD does not exist, then nothing to do!
//logDebug("Not generating state RDD (no previous state, no parent)")
- return None
+ None
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 110a20f..73dc520 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -76,6 +76,6 @@ private[spark] class StreamingListenerBus() extends Logging {
* add overhead in the general case. */
Thread.sleep(10)
}
- return true
+ true
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
index f67bb2f..c3a849d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala
@@ -66,7 +66,7 @@ class SystemClock() extends Clock {
}
Thread.sleep(sleepTime)
}
- return -1
+ -1
}
}
@@ -96,6 +96,6 @@ class ManualClock() extends Clock {
this.wait(100)
}
}
- return currentTime()
+ currentTime()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index 4e6ce6e..5b6c048 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -90,7 +90,7 @@ object RawTextHelper {
}
}
}
- return taken.toIterator
+ taken.toIterator
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 23781ea..e1fe09e 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -158,7 +158,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
appContext.setApplicationName(args.appName)
- return appContext
+ appContext
}
/** See if two file systems are the same or not. */
@@ -191,9 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
- return false
+ false
+ } else {
+ true
}
- return true
}
/** Copy the file into HDFS if needed. */
@@ -299,7 +300,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
}
UserGroupInformation.getCurrentUser().addCredentials(credentials)
- return localResources
+ localResources
}
def setupLaunchEnv(
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index ddfec1a..0138d7a 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -125,7 +125,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
val containerId = ConverterUtils.toContainerId(containerIdString)
val appAttemptId = containerId.getApplicationAttemptId()
logInfo("ApplicationAttemptId: " + appAttemptId)
- return appAttemptId
+ appAttemptId
}
private def registerWithResourceManager(): AMRMProtocol = {
@@ -133,7 +133,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
logInfo("Connecting to ResourceManager at " + rmAddress)
- return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
+ rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
@@ -147,7 +147,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
appMasterRequest.setTrackingUrl("")
- return resourceManager.registerApplicationMaster(appMasterRequest)
+ resourceManager.registerApplicationMaster(appMasterRequest)
}
private def waitForSparkMaster() {
@@ -220,7 +220,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
t.setDaemon(true)
t.start()
logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- return t
+ t
}
private def sendProgress() {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 132630e..d32cdcc 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -195,7 +195,7 @@ class WorkerRunnable(
}
logInfo("Prepared Local resources " + localResources)
- return localResources
+ localResources
}
def prepareEnvironment: HashMap[String, String] = {
@@ -207,7 +207,7 @@ class WorkerRunnable(
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- return env
+ env
}
def connectToCM: ContainerManager = {
@@ -226,8 +226,7 @@ class WorkerRunnable(
val proxy = user
.doAs(new PrivilegedExceptionAction[ContainerManager] {
def run: ContainerManager = {
- return rpc.getProxy(classOf[ContainerManager],
- cmAddress, conf).asInstanceOf[ContainerManager]
+ rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
}
})
proxy
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 5f159b0..535abbf 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -143,7 +143,7 @@ class ClientDistributedCacheManager() extends Logging {
if (isPublic(conf, uri, statCache)) {
return LocalResourceVisibility.PUBLIC
}
- return LocalResourceVisibility.PRIVATE
+ LocalResourceVisibility.PRIVATE
}
/**
@@ -161,7 +161,7 @@ class ClientDistributedCacheManager() extends Logging {
if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
return false
}
- return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+ ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
}
/**
@@ -183,7 +183,7 @@ class ClientDistributedCacheManager() extends Logging {
}
current = current.getParent()
}
- return true
+ true
}
/**
@@ -203,7 +203,7 @@ class ClientDistributedCacheManager() extends Logging {
if (otherAction.implies(action)) {
return true
}
- return false
+ false
}
/**
@@ -223,6 +223,6 @@ class ClientDistributedCacheManager() extends Logging {
statCache.put(uri, newStat)
newStat
}
- return stat
+ stat
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/93a65e5f/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 2941356..458df4f 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -42,7 +42,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar {
class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
LocalResourceVisibility = {
- return LocalResourceVisibility.PRIVATE
+ LocalResourceVisibility.PRIVATE
}
}