You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/07/18 07:47:22 UTC
[2/2] spark git commit: [SPARK-21415] Triage scapegoat warnings,
part 1
[SPARK-21415] Triage scapegoat warnings, part 1
## What changes were proposed in this pull request?
Address scapegoat warnings for:
- BigDecimal double constructor
- Catching NPE
- Finalizer without super
- List.size is O(n)
- Prefer Seq.empty
- Prefer Set.empty
- reverse.map instead of reverseMap
- Type shadowing
- Unnecessary if condition.
- Use .log1p
- Var could be val
In some instances like Seq.empty, I avoided making the change even where valid in test code to keep the scope of the change smaller. Those issues are concerned with performance and it won't matter for tests.
## How was this patch tested?
Existing tests
Author: Sean Owen <so...@cloudera.com>
Closes #18635 from srowen/Scapegoat1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e26dac5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e26dac5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e26dac5f
Branch: refs/heads/master
Commit: e26dac5feb02033f980b1e69c9b0ff50869b6f9e
Parents: 26cd2ca
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jul 18 08:47:17 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 18 08:47:17 2017 +0100
----------------------------------------------------------------------
.../scala/org/apache/spark/SSLOptions.scala | 26 +++++++++---------
.../main/scala/org/apache/spark/SparkEnv.scala | 2 +-
.../main/scala/org/apache/spark/TestUtils.scala | 8 +++---
.../org/apache/spark/api/python/PythonRDD.scala | 1 +
.../scala/org/apache/spark/api/r/SerDe.scala | 3 +--
.../apache/spark/deploy/SparkHadoopUtil.scala | 2 +-
.../deploy/history/FsHistoryProvider.scala | 2 +-
.../spark/deploy/master/ui/MasterWebUI.scala | 2 +-
.../spark/deploy/worker/CommandUtils.scala | 6 ++---
.../main/scala/org/apache/spark/package.scala | 10 +++----
.../org/apache/spark/rdd/CoalescedRDD.scala | 2 +-
.../scala/org/apache/spark/scheduler/Pool.scala | 2 +-
.../org/apache/spark/scheduler/TaskResult.scala | 2 +-
.../apache/spark/scheduler/TaskSetManager.scala | 4 +--
.../cluster/CoarseGrainedSchedulerBackend.scala | 5 ++--
.../org/apache/spark/storage/BlockManager.scala | 6 ++---
.../org/apache/spark/ui/jobs/AllJobsPage.scala | 2 +-
.../apache/spark/ui/jobs/AllStagesPage.scala | 2 +-
.../org/apache/spark/ui/jobs/PoolPage.scala | 2 +-
.../org/apache/spark/ui/jobs/StagePage.scala | 2 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 2 +-
.../org/apache/spark/ui/storage/RDDPage.scala | 2 +-
.../org/apache/spark/util/ClosureCleaner.scala | 26 +++++++++---------
.../org/apache/spark/util/JsonProtocol.scala | 4 +--
.../scala/org/apache/spark/util/Utils.scala | 6 ++---
.../org/apache/spark/examples/LocalFileLR.scala | 6 ++---
.../org/apache/spark/examples/LocalKMeans.scala | 15 +++++------
.../org/apache/spark/examples/LocalLR.scala | 4 +--
.../org/apache/spark/examples/SparkHdfsLR.scala | 6 ++---
.../org/apache/spark/examples/SparkLR.scala | 2 +-
.../spark/examples/ml/DecisionTreeExample.scala | 6 ++---
.../apache/spark/examples/ml/GBTExample.scala | 6 ++---
.../spark/examples/ml/RandomForestExample.scala | 6 ++---
.../examples/mllib/DecisionTreeRunner.scala | 2 +-
.../spark/sql/kafka010/KafkaTestUtils.scala | 2 +-
.../spark/streaming/kafka010/KafkaRDD.scala | 2 +-
.../streaming/kafka010/KafkaTestUtils.scala | 2 +-
.../spark/graphx/impl/EdgePartition.scala | 4 +--
.../apache/spark/graphx/impl/GraphImpl.scala | 2 +-
.../spark/graphx/util/GraphGenerators.scala | 3 ++-
.../ml/classification/LogisticRegression.scala | 4 +--
.../spark/ml/feature/RFormulaParser.scala | 2 +-
.../spark/ml/regression/LinearRegression.scala | 2 +-
.../apache/spark/mllib/clustering/KMeans.scala | 2 +-
.../mllib/linalg/EigenValueDecomposition.scala | 14 +++++-----
.../apache/spark/mllib/optimization/LBFGS.scala | 2 +-
.../stat/correlation/SpearmanCorrelation.scala | 2 +-
.../spark/mllib/stat/test/StreamingTest.scala | 2 +-
project/SparkBuild.scala | 4 +--
.../spark/deploy/mesos/ui/DriverPage.scala | 4 +--
.../cluster/mesos/MesosSchedulerUtils.scala | 2 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++--
.../expressions/collectionOperations.scala | 4 +--
.../optimizer/CostBasedJoinReorder.scala | 2 +-
.../sql/catalyst/optimizer/expressions.scala | 2 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 2 +-
.../spark/sql/catalyst/planning/patterns.scala | 2 +-
.../statsEstimation/FilterEstimation.scala | 14 +++++-----
.../expressions/MathExpressionsSuite.scala | 6 ++---
.../apache/spark/sql/types/DecimalSuite.scala | 4 +--
.../spark/sql/execution/command/ddl.scala | 4 +--
.../datasources/jdbc/JDBCRelation.scala | 3 +--
.../datasources/parquet/ParquetFileFormat.scala | 2 +-
.../apache/spark/sql/execution/objects.scala | 5 +---
.../sql/execution/r/MapPartitionsRWrapper.scala | 3 +--
.../streaming/CompactibleFileStreamLog.scala | 8 ++----
.../execution/streaming/StreamExecution.scala | 2 +-
.../spark/sql/DataFrameAggregateSuite.scala | 28 ++++++++++----------
.../org/apache/spark/sql/DataFrameSuite.scala | 4 +--
.../org/apache/spark/sql/SQLQuerySuite.scala | 4 +--
.../apache/spark/sql/StringFunctionsSuite.scala | 2 +-
.../execution/datasources/json/JsonSuite.scala | 4 +--
.../ParquetPartitionDiscoverySuite.scala | 2 +-
.../org/apache/spark/sql/hive/HiveUtils.scala | 2 +-
.../org/apache/spark/sql/hive/TableReader.scala | 8 +++---
.../spark/sql/hive/client/HiveClientImpl.scala | 2 +-
.../spark/sql/hive/HiveInspectorSuite.scala | 2 +-
.../spark/sql/hive/client/HiveClientSuite.scala | 9 +++----
.../spark/streaming/StreamingContext.scala | 2 +-
.../streaming/api/java/JavaDStreamLike.scala | 2 +-
.../spark/streaming/dstream/StateDStream.scala | 2 +-
.../spark/streaming/util/RawTextHelper.scala | 1 -
82 files changed, 186 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/SSLOptions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index f86fd20..477b019 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -94,21 +94,23 @@ private[spark] case class SSLOptions(
* are supported by the current Java security provider for this protocol.
*/
private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
- Set()
+ Set.empty
} else {
var context: SSLContext = null
- try {
- context = SSLContext.getInstance(protocol.orNull)
- /* The set of supported algorithms does not depend upon the keys, trust, or
+ if (protocol.isEmpty) {
+ logDebug("No SSL protocol specified")
+ context = SSLContext.getDefault
+ } else {
+ try {
+ context = SSLContext.getInstance(protocol.get)
+ /* The set of supported algorithms does not depend upon the keys, trust, or
rng, although they will influence which algorithms are eventually used. */
- context.init(null, null, null)
- } catch {
- case npe: NullPointerException =>
- logDebug("No SSL protocol specified")
- context = SSLContext.getDefault
- case nsa: NoSuchAlgorithmException =>
- logDebug(s"No support for requested SSL protocol ${protocol.get}")
- context = SSLContext.getDefault
+ context.init(null, null, null)
+ } catch {
+ case nsa: NoSuchAlgorithmException =>
+ logDebug(s"No support for requested SSL protocol ${protocol.get}")
+ context = SSLContext.getDefault
+ }
}
val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 3196c1e..45ed986 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -420,7 +420,7 @@ object SparkEnv extends Logging {
if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
} else {
- Seq[(String, String)]()
+ Seq.empty[(String, String)]
}
val sparkProperties = (conf.getAll ++ schedulerMode).sorted
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 3f912dc..a80016d 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -58,8 +58,8 @@ private[spark] object TestUtils {
def createJarWithClasses(
classNames: Seq[String],
toStringValue: String = "",
- classNamesWithBase: Seq[(String, String)] = Seq(),
- classpathUrls: Seq[URL] = Seq()): URL = {
+ classNamesWithBase: Seq[(String, String)] = Seq.empty,
+ classpathUrls: Seq[URL] = Seq.empty): URL = {
val tempDir = Utils.createTempDir()
val files1 = for (name <- classNames) yield {
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
@@ -137,7 +137,7 @@ private[spark] object TestUtils {
val options = if (classpathUrls.nonEmpty) {
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
} else {
- Seq()
+ Seq.empty
}
compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call()
@@ -160,7 +160,7 @@ private[spark] object TestUtils {
destDir: File,
toStringValue: String = "",
baseClass: String = null,
- classpathUrls: Seq[URL] = Seq()): File = {
+ classpathUrls: Seq[URL] = Seq.empty): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fb0405b..6a81752 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -974,6 +974,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
}
}
}
+ super.finalize()
}
}
// scalastyle:on no.finalize
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index dad928c..537ab57 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -128,8 +128,7 @@ private[spark] object SerDe {
}
def readBoolean(in: DataInputStream): Boolean = {
- val intVal = in.readInt()
- if (intVal == 0) false else true
+ in.readInt() != 0
}
def readDate(in: DataInputStream): Date = {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 6afe58b..ccbabf0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -337,7 +337,7 @@ class SparkHadoopUtil extends Logging {
if (credentials != null) {
credentials.getAllTokens.asScala.map(tokenToString)
} else {
- Seq()
+ Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index b2a50bd..687fd2d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -317,7 +317,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
- .getOrElse(Seq[FileStatus]())
+ .getOrElse(Seq.empty[FileStatus])
// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 8cfd0f6..e42f41b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -55,7 +55,7 @@ class MasterWebUI(
}
def addProxyTargets(id: String, target: String): Unit = {
- var endTarget = target.stripSuffix("/")
+ val endTarget = target.stripSuffix("/")
val handler = createProxyHandler("/proxy/" + id, endTarget)
attachHandler(handler)
proxyHandlers(id) = handler
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index cba4aaf..12e0dae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -44,7 +44,7 @@ object CommandUtils extends Logging {
memory: Int,
sparkHome: String,
substituteArguments: String => String,
- classPaths: Seq[String] = Seq[String](),
+ classPaths: Seq[String] = Seq.empty,
env: Map[String, String] = sys.env): ProcessBuilder = {
val localCommand = buildLocalCommand(
command, securityMgr, substituteArguments, classPaths, env)
@@ -73,7 +73,7 @@ object CommandUtils extends Logging {
command: Command,
securityMgr: SecurityManager,
substituteArguments: String => String,
- classPath: Seq[String] = Seq[String](),
+ classPath: Seq[String] = Seq.empty,
env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName
val libraryPathEntries = command.libraryPathEntries
@@ -96,7 +96,7 @@ object CommandUtils extends Logging {
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
- Seq[String](), // library path already captured in environment variable
+ Seq.empty, // library path already captured in environment variable
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 2610d6f..8058a4d 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -17,6 +17,8 @@
package org.apache
+import java.util.Properties
+
/**
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
@@ -40,9 +42,6 @@ package org.apache
* Developer API</span> are intended for advanced users want to extend Spark through lower
* level interfaces. These are subject to changes or removal in minor releases.
*/
-
-import java.util.Properties
-
package object spark {
private object SparkBuildInfo {
@@ -57,6 +56,9 @@ package object spark {
val resourceStream = Thread.currentThread().getContextClassLoader.
getResourceAsStream("spark-version-info.properties")
+ if (resourceStream == null) {
+ throw new SparkException("Could not find spark-version-info.properties")
+ }
try {
val unknownProp = "<unknown>"
@@ -71,8 +73,6 @@ package object spark {
props.getProperty("date", unknownProp)
)
} catch {
- case npe: NullPointerException =>
- throw new SparkException("Error while locating file spark-version-info.properties", npe)
case e: Exception =>
throw new SparkException("Error loading properties from spark-version-info.properties", e)
} finally {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 2cba1fe..10451a3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -269,7 +269,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
tries = 0
// if we don't have enough partition groups, create duplicates
while (numCreated < targetLen) {
- var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
+ val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
tries += 1
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 1181371..f4b0ab1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -97,7 +97,7 @@ private[spark] class Pool(
}
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+ val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 366b92c..836769e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -60,7 +60,7 @@ private[spark] class DirectTaskResult[T](
val numUpdates = in.readInt
if (numUpdates == 0) {
- accumUpdates = Seq()
+ accumUpdates = Seq.empty
} else {
val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3968fb7..589fe67 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -891,7 +891,7 @@ private[spark] class TaskSetManager(
override def removeSchedulable(schedulable: Schedulable) {}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
+ val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
sortedTaskSetQueue += this
sortedTaskSetQueue
}
@@ -948,7 +948,7 @@ private[spark] class TaskSetManager(
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
- var medianDuration = successfulTaskDurations.median
+ val medianDuration = successfulTaskDurations.median
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0b396b7..a46824a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -23,7 +23,6 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
-import scala.concurrent.duration.Duration
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
@@ -427,11 +426,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* be called in the yarn-client mode when AM re-registers after a failure.
* */
protected def reset(): Unit = {
- val executors = synchronized {
+ val executors: Set[String] = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0
executorsPendingToRemove.clear()
- Set() ++ executorDataMap.keys
+ executorDataMap.keys.toSet
}
// Remove all the lingering executors that should be removed but not yet. The reason might be
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index adbe3cf..aaacabe 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1275,11 +1275,11 @@ private[spark] class BlockManager(
val numPeersToReplicateTo = level.replication - 1
val startTime = System.nanoTime
- var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
- var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
+ val peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
+ val peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0
- val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
+ val initialPeers = getPeers(false).filterNot(existingReplicas.contains)
var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index cce7a76..a7f2caa 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -241,7 +241,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}.getOrElse(jobIdTitle)
val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
// New jobs should be shown above old jobs by default.
- if (jobSortColumn == jobIdTitle) true else false
+ jobSortColumn == jobIdTitle
)
val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index 2b0816e..a30c135 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -115,7 +115,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
- Seq[Node]()
+ Seq.empty[Node]
}
}
if (shouldShowActiveStages) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index b164f32..819fe57 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -41,7 +41,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match {
case Some(s) => s.values.toSeq
- case None => Seq[StageInfo]()
+ case None => Seq.empty[StageInfo]
}
val shouldShowActiveStages = activeStages.nonEmpty
val activeStagesTable =
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6b3dadc..8ed5174 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -565,7 +565,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
val maybeAccumulableTable: Seq[Node] =
- if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
+ if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq.empty
val aggMetrics =
<span class="collapse-aggregated-metrics collapse-table"
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a28daf7..f0a12a2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -60,7 +60,7 @@ private[ui] class StageTableBase(
}.getOrElse("Stage Id")
val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
// New stages should be shown above old jobs by default.
- if (stageSortColumn == "Stage Id") true else false
+ stageSortColumn == "Stage Id"
)
val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 317e0aa..e8ff08f 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -51,7 +51,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
.getOrElse {
// Rather than crashing, render an "RDD Not Found" page
- return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
+ return UIUtils.headerSparkPage("RDD Not Found", Seq.empty[Node], parent)
}
// Worker table
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 489688c..48a1d7b 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -81,7 +81,7 @@ private[spark] object ClosureCleaner extends Logging {
val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
val cr = getClassReader(stack.pop())
- val set = Set[Class[_]]()
+ val set = Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
seen += cls
@@ -180,16 +180,18 @@ private[spark] object ClosureCleaner extends Logging {
val declaredFields = func.getClass.getDeclaredFields
val declaredMethods = func.getClass.getDeclaredMethods
- logDebug(" + declared fields: " + declaredFields.size)
- declaredFields.foreach { f => logDebug(" " + f) }
- logDebug(" + declared methods: " + declaredMethods.size)
- declaredMethods.foreach { m => logDebug(" " + m) }
- logDebug(" + inner classes: " + innerClasses.size)
- innerClasses.foreach { c => logDebug(" " + c.getName) }
- logDebug(" + outer classes: " + outerClasses.size)
- outerClasses.foreach { c => logDebug(" " + c.getName) }
- logDebug(" + outer objects: " + outerObjects.size)
- outerObjects.foreach { o => logDebug(" " + o) }
+ if (log.isDebugEnabled) {
+ logDebug(" + declared fields: " + declaredFields.size)
+ declaredFields.foreach { f => logDebug(" " + f) }
+ logDebug(" + declared methods: " + declaredMethods.size)
+ declaredMethods.foreach { m => logDebug(" " + m) }
+ logDebug(" + inner classes: " + innerClasses.size)
+ innerClasses.foreach { c => logDebug(" " + c.getName) }
+ logDebug(" + outer classes: " + outerClasses.size)
+ outerClasses.foreach { c => logDebug(" " + c.getName) }
+ logDebug(" + outer objects: " + outerObjects.size)
+ outerObjects.foreach { o => logDebug(" " + o) }
+ }
// Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
@@ -201,7 +203,7 @@ private[spark] object ClosureCleaner extends Logging {
// Initialize accessed fields with the outer classes first
// This step is needed to associate the fields to the correct classes later
for (cls <- outerClasses) {
- accessedFields(cls) = Set[String]()
+ accessedFields(cls) = Set.empty[String]
}
// Populate accessed fields by visiting all fields and methods accessed by this and
// all of its inner closures. If transitive cleaning is enabled, this may recursively
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 806d14e..8406826 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -696,7 +696,7 @@ private[spark] object JsonProtocol {
val accumulatedValues = {
Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
- case None => Seq[AccumulableInfo]()
+ case None => Seq.empty[AccumulableInfo]
}
}
@@ -726,7 +726,7 @@ private[spark] object JsonProtocol {
val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
- case None => Seq[AccumulableInfo]()
+ case None => Seq.empty[AccumulableInfo]
}
val taskInfo =
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 584337a..d661293 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1443,7 +1443,7 @@ private[spark] object Utils extends Logging {
var firstUserFile = "<unknown>"
var firstUserLine = 0
var insideSpark = true
- var callStack = new ArrayBuffer[String]() :+ "<unknown>"
+ val callStack = new ArrayBuffer[String]() :+ "<unknown>"
Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
@@ -2438,7 +2438,7 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}
- val EMPTY_USER_GROUPS = Set[String]()
+ val EMPTY_USER_GROUPS = Set.empty[String]
// Returns the groups to which the current user belongs.
def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = {
@@ -2587,7 +2587,7 @@ private[spark] object Utils extends Logging {
* Unions two comma-separated lists of files and filters out empty strings.
*/
def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = {
- var allFiles = Set[String]()
+ var allFiles = Set.empty[String]
leftList.foreach { value => allFiles ++= value.split(",") }
rightList.foreach { value => allFiles ++= value.split(",") }
allFiles.filter { _.nonEmpty }
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index a897cad..8dbb7ee 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -53,16 +53,16 @@ object LocalFileLR {
val fileSrc = scala.io.Source.fromFile(args(0))
val lines = fileSrc.getLines().toArray
- val points = lines.map(parsePoint _)
+ val points = lines.map(parsePoint)
val ITERATIONS = args(1).toInt
// Initialize w to a random value
- var w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
+ val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
- var gradient = DenseVector.zeros[Double](D)
+ val gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index fca585c..963c9a5 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -47,12 +47,11 @@ object LocalKMeans {
}
def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = {
- var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 1 to centers.size) {
- val vCurr = centers.get(i).get
+ val vCurr = centers(i)
val tempDist = squaredDistance(p, vCurr)
if (tempDist < closest) {
closest = tempDist
@@ -76,8 +75,8 @@ object LocalKMeans {
showWarning()
val data = generateData
- var points = new HashSet[Vector[Double]]
- var kPoints = new HashMap[Int, Vector[Double]]
+ val points = new HashSet[Vector[Double]]
+ val kPoints = new HashMap[Int, Vector[Double]]
var tempDist = 1.0
while (points.size < K) {
@@ -92,11 +91,11 @@ object LocalKMeans {
println("Initial centers: " + kPoints)
while(tempDist > convergeDist) {
- var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+ val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
- var mappings = closest.groupBy[Int] (x => x._1)
+ val mappings = closest.groupBy[Int] (x => x._1)
- var pointStats = mappings.map { pair =>
+ val pointStats = mappings.map { pair =>
pair._2.reduceLeft [(Int, (Vector[Double], Int))] {
case ((id1, (p1, c1)), (id2, (p2, c2))) => (id1, (p1 + p2, c1 + c2))
}
@@ -107,7 +106,7 @@ object LocalKMeans {
tempDist = 0.0
for (mapping <- newPoints) {
- tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2)
+ tempDist += squaredDistance(kPoints(mapping._1), mapping._2)
}
for (newP <- newPoints) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
index 13ccc2a..eb5221f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala
@@ -60,12 +60,12 @@ object LocalLR {
val data = generateData
// Initialize w to a random value
- var w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
+ val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
- var gradient = DenseVector.zeros[Double](D)
+ val gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
gradient += p.x * scale
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 05ac6cb..9d675bb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -40,8 +40,8 @@ object SparkHdfsLR {
def parsePoint(line: String): DataPoint = {
val tok = new java.util.StringTokenizer(line, " ")
- var y = tok.nextToken.toDouble
- var x = new Array[Double](D)
+ val y = tok.nextToken.toDouble
+ val x = new Array[Double](D)
var i = 0
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
@@ -78,7 +78,7 @@ object SparkHdfsLR {
val ITERATIONS = args(1).toInt
// Initialize w to a random value
- var w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
+ val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
for (i <- 1 to ITERATIONS) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
index cb2be09..c18e3d3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
@@ -72,7 +72,7 @@ object SparkLR {
val points = spark.sparkContext.parallelize(generateData, numSlices).cache()
// Initialize w to a random value
- var w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
+ val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
for (i <- 1 to ITERATIONS) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
index b03701e..19f2d77 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala
@@ -251,7 +251,7 @@ object DecisionTreeExample {
.setMinInfoGain(params.minInfoGain)
.setCacheNodeIds(params.cacheNodeIds)
.setCheckpointInterval(params.checkpointInterval)
- case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
stages += dt
val pipeline = new Pipeline().setStages(stages.toArray)
@@ -278,7 +278,7 @@ object DecisionTreeExample {
} else {
println(treeModel) // Print model summary.
}
- case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
// Evaluate model on training, test data.
@@ -294,7 +294,7 @@ object DecisionTreeExample {
println("Test data results:")
evaluateRegressionModel(pipelineModel, test, labelColName)
case _ =>
- throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
index 3bd8ff5..8f3ce4b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
@@ -190,7 +190,7 @@ object GBTExample {
.setCacheNodeIds(params.cacheNodeIds)
.setCheckpointInterval(params.checkpointInterval)
.setMaxIter(params.maxIter)
- case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
stages += dt
val pipeline = new Pipeline().setStages(stages.toArray)
@@ -217,7 +217,7 @@ object GBTExample {
} else {
println(rfModel) // Print model summary.
}
- case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
// Evaluate model on training, test data.
@@ -233,7 +233,7 @@ object GBTExample {
println("Test data results:")
DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName)
case _ =>
- throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala
index a735c21..3c127a4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala
@@ -198,7 +198,7 @@ object RandomForestExample {
.setCheckpointInterval(params.checkpointInterval)
.setFeatureSubsetStrategy(params.featureSubsetStrategy)
.setNumTrees(params.numTrees)
- case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
stages += dt
val pipeline = new Pipeline().setStages(stages.toArray)
@@ -225,7 +225,7 @@ object RandomForestExample {
} else {
println(rfModel) // Print model summary.
}
- case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
// Evaluate model on training, test data.
@@ -241,7 +241,7 @@ object RandomForestExample {
println("Test data results:")
DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName)
case _ =>
- throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ throw new IllegalArgumentException(s"Algo ${params.algo} not supported.")
}
spark.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
index 0ad0465..fa47e12 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -211,7 +211,7 @@ object DecisionTreeRunner {
case Regression =>
(origExamples, null, 0)
case _ =>
- throw new IllegalArgumentException("Algo ${params.algo} not supported.")
+ throw new IllegalArgumentException(s"Algo $algo not supported.")
}
// Create training, test sets.
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index f86b8f5..5915d9f 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -378,7 +378,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
- leaderAndInSyncReplicas.isr.size >= 1
+ leaderAndInSyncReplicas.isr.nonEmpty
case _ =>
false
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 62cdf5b..d9fc9cc 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -156,7 +156,7 @@ private[spark] class KafkaRDD[K, V](
val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
val execs = if (prefExecs.isEmpty) allExecs else prefExecs
if (execs.isEmpty) {
- Seq()
+ Seq.empty
} else {
// execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
val index = Math.floorMod(tp.hashCode, execs.length)
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 8273c2b..6c7024e 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -257,7 +257,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
- leaderAndInSyncReplicas.isr.size >= 1
+ leaderAndInSyncReplicas.isr.nonEmpty
case _ =>
false
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 26349f4..0e6a340 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -388,7 +388,7 @@ class EdgePartition[
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)
- var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
+ val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
var i = 0
while (i < size) {
val localSrcId = localSrcIds(i)
@@ -433,7 +433,7 @@ class EdgePartition[
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)
- var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
+ val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
index.iterator.foreach { cluster =>
val clusterSrcId = cluster._1
val clusterPos = cluster._2
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 5d2a537..34e1253 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -74,7 +74,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def getCheckpointFiles: Seq[String] = {
Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap {
case Some(path) => Seq(path)
- case None => Seq()
+ case None => Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 2b3e5f9..4197311 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -18,6 +18,7 @@
package org.apache.spark.graphx.util
import scala.annotation.tailrec
+import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util._
@@ -133,7 +134,7 @@ object GraphGenerators extends Logging {
throw new IllegalArgumentException(
s"numEdges must be <= $numEdgesUpperBound but was $numEdges")
}
- var edges: Set[Edge[Int]] = Set()
+ var edges = mutable.Set.empty[Edge[Int]]
while (edges.size < numEdges) {
if (edges.size % 100 == 0) {
logDebug(edges.size + " edges")
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index b234bc4..65b09e5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -736,7 +736,7 @@ class LogisticRegression @Since("1.2.0") (
b_k' = b_k - \mean(b_k)
}}}
*/
- val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing
+ val rawIntercepts = histogram.map(math.log1p) // add 1 for smoothing (log1p(x) = log(1+x))
val rawMean = rawIntercepts.sum / rawIntercepts.length
rawIntercepts.indices.foreach { i =>
initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean)
@@ -820,7 +820,7 @@ class LogisticRegression @Since("1.2.0") (
val interceptVec = if ($(fitIntercept) || !isMultinomial) {
Vectors.zeros(numCoefficientSets)
} else {
- Vectors.sparse(numCoefficientSets, Seq())
+ Vectors.sparse(numCoefficientSets, Seq.empty)
}
// separate intercepts and coefficients from the combined matrix
allCoefMatrix.foreachActive { (classIndex, featureIndex, value) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
index 2dd565a..32835fb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
@@ -99,7 +99,7 @@ private[ml] case class ParsedRFormula(label: ColumnRef, terms: Seq[Term]) {
}).map(_.distinct)
// Deduplicates feature interactions, for example, a:b is the same as b:a.
- var seen = mutable.Set[Set[String]]()
+ val seen = mutable.Set[Set[String]]()
validInteractions.flatMap {
case t if seen.contains(t.toSet) =>
None
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 91cd229..ccc61fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -286,7 +286,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
s"training is not needed.")
}
if (handlePersistence) instances.unpersist()
- val coefficients = Vectors.sparse(numFeatures, Seq())
+ val coefficients = Vectors.sparse(numFeatures, Seq.empty)
val intercept = yMean
val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept))
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 98e50c5..49043b5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -363,7 +363,7 @@ class KMeans private (
// to their squared distance from the centers. Note that only distances between points
// and new centers are computed in each iteration.
var step = 0
- var bcNewCentersList = ArrayBuffer[Broadcast[_]]()
+ val bcNewCentersList = ArrayBuffer[Broadcast[_]]()
while (step < initializationSteps) {
val bcNewCenters = data.context.broadcast(newCenters)
bcNewCentersList += bcNewCenters
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
index 7695aab..c7c1a54 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
@@ -78,13 +78,13 @@ private[mllib] object EigenValueDecomposition {
require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE,
s"k = $k and/or n = $n are too large to compute an eigendecomposition")
- var ido = new intW(0)
- var info = new intW(0)
- var resid = new Array[Double](n)
- var v = new Array[Double](n * ncv)
- var workd = new Array[Double](n * 3)
- var workl = new Array[Double](ncv * (ncv + 8))
- var ipntr = new Array[Int](11)
+ val ido = new intW(0)
+ val info = new intW(0)
+ val resid = new Array[Double](n)
+ val v = new Array[Double](n * ncv)
+ val workd = new Array[Double](n * 3)
+ val workl = new Array[Double](ncv * (ncv + 8))
+ val ipntr = new Array[Int](11)
// call ARPACK's reverse communication, first iteration with ido = 0
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index efedebe..21ec287 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -257,7 +257,7 @@ object LBFGS extends Logging {
(denseGrad1, loss1 + loss2)
}
- val zeroSparseVector = Vectors.sparse(n, Seq())
+ val zeroSparseVector = Vectors.sparse(n, Seq.empty)
val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp)
// broadcasted model is not needed anymore
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index b760347..ee51d33 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -57,7 +57,7 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
var preCol = -1
var preVal = Double.NaN
var startRank = -1.0
- var cachedUids = ArrayBuffer.empty[Long]
+ val cachedUids = ArrayBuffer.empty[Long]
val flush: () => Iterable[(Long, (Int, Double))] = () => {
val averageRank = startRank + (cachedUids.size - 1) / 2.0
val output = cachedUids.map { uid =>
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
index 551ea35..80c6ef0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
@@ -133,7 +133,7 @@ class StreamingTest @Since("1.6.0") () extends Logging with Serializable {
if (time.milliseconds > data.slideDuration.milliseconds * peacePeriod) {
rdd
} else {
- data.context.sparkContext.parallelize(Seq())
+ data.context.sparkContext.parallelize(Seq.empty)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 41f3a04..b9db1df 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -589,7 +589,7 @@ object PySparkAssembly {
val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
zipFile.delete()
zipRecursive(src, zipFile)
- Seq[File]()
+ Seq.empty[File]
}).value
)
@@ -810,7 +810,7 @@ object TestSettings {
require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d")
}
}
- Seq[File]()
+ Seq.empty[File]
}).value,
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
// Remove certain packages from Scaladoc
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
index a6bb5d5..022191d 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -112,7 +112,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
<td>Last Task Status</td>
<td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td>
</tr>
- }.getOrElse(Seq[Node]())
+ }.getOrElse(Seq.empty[Node])
}
private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = {
@@ -175,6 +175,6 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
{state.retries}
</td>
</tr>
- }.getOrElse(Seq[Node]())
+ }.getOrElse(Seq.empty[Node])
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 062ed1f..7ec116c 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -333,7 +333,7 @@ trait MesosSchedulerUtils extends Logging {
try {
splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
if (v == null || v.isEmpty) {
- Set[String]()
+ Set.empty[String]
} else {
v.split(',').toSet
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7745709..501e7e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2334,8 +2334,9 @@ object TimeWindowing extends Rule[LogicalPlan] {
val windowExpressions =
p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet
+ val numWindowExpr = windowExpressions.size
// Only support a single window expression for now
- if (windowExpressions.size == 1 &&
+ if (numWindowExpr == 1 &&
windowExpressions.head.timeColumn.resolved &&
windowExpressions.head.checkInputDataTypes().isSuccess) {
@@ -2402,7 +2403,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
renamedPlan.withNewChildren(substitutedPlan :: Nil)
}
- } else if (windowExpressions.size > 1) {
+ } else if (numWindowExpr > 1) {
p.failAnalysis("Multiple time window expressions would result in a cartesian product " +
"of rows, therefore they are currently not supported.")
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index c863ba4..83a23cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -228,10 +228,10 @@ case class ArrayContains(left: Expression, right: Expression)
override def dataType: DataType = BooleanType
override def inputTypes: Seq[AbstractDataType] = right.dataType match {
- case NullType => Seq()
+ case NullType => Seq.empty
case _ => left.dataType match {
case n @ ArrayType(element, _) => Seq(n, element)
- case _ => Seq()
+ case _ => Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index db7baf6..064ca68 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -150,7 +150,7 @@ object JoinReorderDP extends PredicateHelper with Logging {
// Create the initial plans: each plan is a single item with zero cost.
val itemIndex = items.zipWithIndex
val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map {
- case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0))
+ case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set.empty, Cost(0, 0))
}.toMap)
// Build filters from the join graph to be used by the search algorithm.
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 6c83f47..79a6c86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -134,7 +134,7 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] {
private def collectGroupingExpressions(plan: LogicalPlan): ExpressionSet = plan match {
case Aggregate(groupingExpressions, aggregateExpressions, child) =>
ExpressionSet.apply(groupingExpressions)
- case _ => ExpressionSet(Seq())
+ case _ => ExpressionSet(Seq.empty)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index ad359e7..45c1d3d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -877,7 +877,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// Reverse the contexts to have them in the same sequence as in the SQL statement & turn them
// into expressions.
- val expressions = contexts.reverse.map(expression)
+ val expressions = contexts.reverseMap(expression)
// Create a balanced tree.
def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 7f370fb..8d034c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -173,7 +173,7 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition))
- case _ => (Seq((plan, parentJoinType)), Seq())
+ case _ => (Seq((plan, parentJoinType)), Seq.empty)
}
def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index e13db85..74820eb 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -47,7 +47,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
// Estimate selectivity of this filter predicate, and update column stats if needed.
// For not-supported condition, set filter selectivity to a conservative estimate 100%
- val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1.0))
+ val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1))
val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
val newColStats = if (filteredRowCount == 0) {
@@ -83,13 +83,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
: Option[BigDecimal] = {
condition match {
case And(cond1, cond2) =>
- val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1.0))
- val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1.0))
+ val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1))
+ val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1))
Some(percent1 * percent2)
case Or(cond1, cond2) =>
- val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1.0))
- val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1.0))
+ val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1))
+ val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1))
Some(percent1 + percent2 - (percent1 * percent2))
// Not-operator pushdown
@@ -464,7 +464,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
(numericLiteral > max, numericLiteral <= min)
}
- var percent = BigDecimal(1.0)
+ var percent = BigDecimal(1)
if (noOverlap) {
percent = 0.0
} else if (completeOverlap) {
@@ -630,7 +630,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
)
}
- var percent = BigDecimal(1.0)
+ var percent = BigDecimal(1)
if (noOverlap) {
percent = 0.0
} else if (completeOverlap) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
index f4d5a44..9ee7775 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
@@ -609,9 +609,9 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(BRound(floatPi, scale), floatResults(i), EmptyRow)
}
- val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3.0), BigDecimal(3.1), BigDecimal(3.14),
- BigDecimal(3.142), BigDecimal(3.1416), BigDecimal(3.14159),
- BigDecimal(3.141593), BigDecimal(3.1415927))
+ val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3), BigDecimal("3.1"), BigDecimal("3.14"),
+ BigDecimal("3.142"), BigDecimal("3.1416"), BigDecimal("3.14159"),
+ BigDecimal("3.141593"), BigDecimal("3.1415927"))
(0 to 7).foreach { i =>
checkEvaluation(Round(bdPi, i), bdResults(i), EmptyRow)
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index 144f3d6..3193d13 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -109,8 +109,8 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester {
test("small decimals represented as unscaled long") {
checkCompact(new Decimal(), true)
- checkCompact(Decimal(BigDecimal(10.03)), false)
- checkCompact(Decimal(BigDecimal(1e20)), false)
+ checkCompact(Decimal(BigDecimal("10.03")), false)
+ checkCompact(Decimal(BigDecimal("100000000000000000000")), false)
checkCompact(Decimal(17L), true)
checkCompact(Decimal(17), true)
checkCompact(Decimal(17L, 2, 1), true)
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ba7ca84..dae160f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -671,11 +671,11 @@ case class AlterTableRecoverPartitionsCommand(
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
- Seq()
+ Seq.empty
}
} else {
logWarning(s"ignore ${new Path(path, name)}")
- Seq()
+ Seq.empty
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index a521fd1..658d137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -23,7 +23,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -81,7 +80,7 @@ private[sql] object JDBCRelation extends Logging {
val column = partitioning.column
var i: Int = 0
var currentValue: Long = lowerBound
- var ans = new ArrayBuffer[Partition]()
+ val ans = new ArrayBuffer[Partition]()
while (i < numPartitions) {
val lBound = if (i != 0) s"$column >= $currentValue" else null
currentValue += stride
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 87fbf8b..64eea26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -220,7 +220,7 @@ class ParquetFileFormat
val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
- Seq()
+ Seq.empty
} else {
filesByType.data
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 3439181..9e4e02b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -35,8 +35,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.streaming.GroupStateTimeout
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
/**
* Physical version of `ObjectProducer`.
@@ -403,8 +401,7 @@ case class FlatMapGroupsInRExec(
Seq(groupingAttributes.map(SortOrder(_, Ascending)))
override protected def doExecute(): RDD[InternalRow] = {
- val isSerializedRData =
- if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
+ val isSerializedRData = outputSchema == SERIALIZED_R_DATA_SCHEMA
val serializerForR = if (!isSerializedRData) {
SerializationFormats.ROW
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
index d2178e9..b9835c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
@@ -34,8 +34,7 @@ case class MapPartitionsRWrapper(
outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) {
def apply(iter: Iterator[Any]): Iterator[Any] = {
// If the content of current DataFrame is serialized R data?
- val isSerializedRData =
- if (inputSchema == SERIALIZED_R_DATA_SCHEMA) true else false
+ val isSerializedRData = inputSchema == SERIALIZED_R_DATA_SCHEMA
val (newIter, deserializer, colNames) =
if (!isSerializedRData) {
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 408c8f8..e37033b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -170,12 +170,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs
- if (super.add(batchId, compactLogs(allLogs).toArray)) {
- true
- } else {
- // Return false as there is another writer.
- false
- }
+ // Return false as there is another writer.
+ super.add(batchId, compactLogs(allLogs).toArray)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5ee596e..5711262 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -609,7 +609,7 @@ class StreamExecution(
}
// A list of attributes that will need to be updated.
- var replacements = new ArrayBuffer[(Attribute, Attribute)]
+ val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org