You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/09 19:09:30 UTC
[2/3] Merge pull request #557 from ScrapCodes/style. Closes #557.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 09d0a81..56c7777 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -39,7 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt
override def equals(other: Any): Boolean = other match {
- case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId && this.slice == that.slice)
+ case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId &&
+ this.slice == that.slice)
case _ => false
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 4c625d0..f436432 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -23,8 +23,8 @@ import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partitio
/**
- * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of corresponding partitions
- * of parent RDDs.
+ * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
+ * corresponding partitions of parent RDDs.
*/
private[spark]
class PartitionerAwareUnionRDDPartition(
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index bc68811..73e8769 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -35,10 +35,10 @@ private[spark] object CheckpointState extends Enumeration {
}
/**
- * This class contains all the information related to RDD checkpointing. Each instance of this class
- * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
- * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
- * of the checkpointed RDD.
+ * This class contains all the information related to RDD checkpointing. Each instance of this
+ * class is associated with a RDD. It manages process of checkpointing of the associated RDD,
+ * as well as, manages the post-checkpoint state by providing the updated partitions,
+ * iterator and preferred locations of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends Logging with Serializable {
@@ -97,7 +97,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
- "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
+ "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 2d1bd5b..c9b4c76 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -71,7 +71,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
+ valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 21d16fa..8021154 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1082,8 +1082,9 @@ class DAGScheduler(
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocs(n.rdd, inPart)
- if (locs != Nil)
+ if (locs != Nil) {
return locs
+ }
}
case _ =>
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index cc10cc0..23447f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConversions._
* Parses and holds information about inputFormat (and files) specified as a parameter.
*/
class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
- val path: String) extends Logging {
+ val path: String) extends Logging {
var mapreduceInputFormat: Boolean = false
var mapredInputFormat: Boolean = false
@@ -41,7 +41,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
validate()
override def toString: String = {
- "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
+ "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", " +
+ "path : " + path
}
override def hashCode(): Int = {
@@ -50,8 +51,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
hashCode
}
- // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
- // .. which is fine, this is best case effort to remove duplicates - right ?
+ // Since we are not doing canonicalization of path, this can be wrong : like relative vs
+ // absolute path .. which is fine, this is best case effort to remove duplicates - right ?
override def equals(other: Any): Boolean = other match {
case that: InputFormatInfo => {
// not checking config - that should be fine, right ?
@@ -65,22 +66,26 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path " + path)
try {
- if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+ if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(
+ inputFormatClazz)) {
logDebug("inputformat is from mapreduce package")
mapreduceInputFormat = true
}
- else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+ else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(
+ inputFormatClazz)) {
logDebug("inputformat is from mapred package")
mapredInputFormat = true
}
else {
throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
- " is NOT a supported input format ? does not implement either of the supported hadoop api's")
+ " is NOT a supported input format ? does not implement either of the supported hadoop " +
+ "api's")
}
}
catch {
case e: ClassNotFoundException => {
- throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
+ throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
+ " cannot be found ?", e)
}
}
}
@@ -125,8 +130,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
}
private def findPreferredLocations(): Set[SplitInfo] = {
- logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
- ", inputFormatClazz : " + inputFormatClazz)
+ logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " +
+ mapredInputFormat + ", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
prefLocsFromMapreduceInputFormat()
}
@@ -150,8 +155,8 @@ object InputFormatInfo {
c) Compute rack info for each host and update rack -> count map based on (b).
d) Allocate nodes based on (c)
e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
- (even if data locality on that is very high) : this is to prevent fragility of job if a single
- (or small set of) hosts go down.
+ (even if data locality on that is very high) : this is to prevent fragility of job if a
+ single (or small set of) hosts go down.
go to (a) until required nodes are allocated.
@@ -159,7 +164,8 @@ object InputFormatInfo {
PS: I know the wording here is weird, hopefully it makes some sense !
*/
- def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
+ def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]]
+ = {
val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
for (inputSplit <- formats) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f8fa5a9..b909b66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -45,10 +45,11 @@ class JobLogger(val user: String, val logDirName: String)
String.valueOf(System.currentTimeMillis()))
private val logDir =
- if (System.getenv("SPARK_LOG_DIR") != null)
+ if (System.getenv("SPARK_LOG_DIR") != null) {
System.getenv("SPARK_LOG_DIR")
- else
+ } else {
"/tmp/spark-%s".format(user)
+ }
private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIDToJobID = new HashMap[Int, Int]
@@ -116,7 +117,7 @@ class JobLogger(val user: String, val logDirName: String)
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
- writeInfo = DATE_FORMAT.format(date) + ": " +info
+ writeInfo = DATE_FORMAT.format(date) + ": " + info
}
jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
}
@@ -235,7 +236,8 @@ class JobLogger(val user: String, val logDirName: String)
* @param stage Root stage of the job
* @param indent Indent number before info, default is 0
*/
- protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) {
+ protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0)
+ {
val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
} else {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index c381348..d94f6ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -23,4 +23,5 @@ package org.apache.spark.scheduler
private[spark] sealed trait JobResult
private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage])
+ extends JobResult
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 28f3ba5..0544f81 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -36,7 +36,8 @@ private[spark] object ResultTask {
val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
- def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
+ def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _)
+ : Array[Byte] = {
synchronized {
val old = serializedInfoCache.get(stageId).orNull
if (old != null) {
@@ -55,7 +56,8 @@ private[spark] object ResultTask {
}
}
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
+ def deserializeInfo(stageId: Int, bytes: Array[Byte])
+ : (RDD[_], (TaskContext, Iterator[_]) => _) = {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 3cf995e..a546193 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -148,6 +148,6 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
}
}
parentPool.addSchedulable(manager)
- logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
+ logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d8e97c3..d25f0a6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -37,8 +37,8 @@ case class SparkListenerTaskGettingResult(
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
-case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], properties: Properties = null)
- extends SparkListenerEvents
+case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int],
+ properties: Properties = null) extends SparkListenerEvents
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
@@ -99,11 +99,14 @@ class StatsReportListener extends SparkListener with Logging {
showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
//shuffle write
- showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+ showBytesDistribution("shuffle bytes written:",
+ (_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
//fetch & io
- showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
- showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+ showMillisDistribution("fetch wait time:",
+ (_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+ showBytesDistribution("remote bytes read:",
+ (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
//runtime breakdown
@@ -111,8 +114,10 @@ class StatsReportListener extends SparkListener with Logging {
val runtimePcts = stageCompleted.stage.taskInfos.map{
case (info, metrics) => RuntimePercentage(info.duration, metrics)
}
- showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
- showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
+ showDistribution("executor (non-fetch) time pct: ",
+ Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+ showDistribution("fetch wait time pct: ",
+ Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
}
@@ -147,7 +152,8 @@ private[spark] object StatsReportListener extends Logging {
logInfo("\t" + quantiles.mkString("\t"))
}
- def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
+ def showDistribution(heading: String,
+ dOpt: Option[Distribution], formatNumber: Double => String) {
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}
@@ -156,7 +162,8 @@ private[spark] object StatsReportListener extends Logging {
showDistribution(heading, dOpt, f _)
}
- def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+ def showDistribution(heading:String, format: String,
+ getMetric: (TaskInfo,TaskMetrics) => Option[Double])
(implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
}
@@ -175,7 +182,8 @@ private[spark] object StatsReportListener extends Logging {
}
def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
- showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+ showDistribution(heading, dOpt,
+ (d => StatsReportListener.millisToString(d.toLong)): Double => String)
}
def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
@@ -212,7 +220,7 @@ private object RuntimePercentage {
val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
val fetch = fetchTime.map{_ / denom}
- val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
+ val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
RuntimePercentage(exec, fetch, other)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 520c0b2..a78b018 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -63,8 +63,9 @@ private[spark] class Stage(
def addOutputLoc(partition: Int, status: MapStatus) {
val prevList = outputLocs(partition)
outputLocs(partition) = status :: prevList
- if (prevList == Nil)
+ if (prevList == Nil) {
numAvailableOutputs += 1
+ }
}
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index c4d1ad5..8f320e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -29,7 +29,8 @@ import org.apache.spark.executor.TaskMetrics
*/
class StageInfo(
stage: Stage,
- val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+ val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] =
+ mutable.Buffer[(TaskInfo, TaskMetrics)]()
) {
val stageId = stage.id
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 3c22edd..91c27d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -70,16 +70,17 @@ class TaskInfo(
def running: Boolean = !finished
def status: String = {
- if (running)
+ if (running) {
"RUNNING"
- else if (gettingResult)
+ } else if (gettingResult) {
"GET RESULT"
- else if (failed)
+ } else if (failed) {
"FAILED"
- else if (successful)
+ } else if (successful) {
"SUCCESS"
- else
+ } else {
"UNKNOWN"
+ }
}
def duration: Long = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 9d3e615..5724ec9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,7 +35,8 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
-class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any],
+ var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 17b6d97..1cdfed1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -36,7 +36,8 @@ private[spark] trait TaskScheduler {
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).
- // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
+ // Yarn uses this to bootstrap allocation of resources based on preferred locations,
+ // wait for slave registerations, etc.
def postStartHook() { }
// Disconnect from the cluster.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3f0ee7a..21b2ff1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -80,7 +80,7 @@ private[spark] class TaskSetManager(
var minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
- var name = "TaskSet_"+taskSet.stageId.toString
+ var name = "TaskSet_" + taskSet.stageId.toString
var parent: Pool = null
val runningTasksSet = new HashSet[Long]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0208388..7820410 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -120,7 +120,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
sender ! true
case DisassociatedEvent(_, address, _) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
+ addressToExecutorId.get(address).foreach(removeExecutor(_,
+ "remote Akka client disassociated"))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 33aac52..04f35cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -51,8 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
- val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
- "http://" + sc.ui.appUIAddress)
+ val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command,
+ sparkHome, "http://" + sc.ui.appUIAddress)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
@@ -84,7 +84,8 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
- override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
+ memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index c27049b..4401f6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -136,7 +136,8 @@ private[spark] class CoarseMesosSchedulerBackend(
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
+ ("cd %s*; " +
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
.format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c14cd47..2d0b255 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -60,7 +60,8 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
try {
for (regCls <- conf.getOption("spark.kryo.registrator")) {
logDebug("Running user registrator: " + regCls)
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+ val reg = Class.forName(regCls, true, classLoader).newInstance()
+ .asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 9a5e3cb..a38a2b5 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -27,11 +27,12 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream}
/**
* A serializer. Because some serialization libraries are not thread safe, this class is used to
- * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are
- * guaranteed to only be called from one thread at a time.
+ * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual
+ * serialization and are guaranteed to only be called from one thread at a time.
*
* Implementations of this trait should have a zero-arg constructor or a constructor that accepts a
- * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence.
+ * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes
+ * precedence.
*/
trait Serializer {
def newInstance(): SerializerInstance
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4fa2ab9..aa62ab5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -76,9 +76,9 @@ object BlockFetcherIterator {
import blockManager._
- private var _remoteBytesRead = 0l
- private var _remoteFetchTime = 0l
- private var _fetchWaitTime = 0l
+ private var _remoteBytesRead = 0L
+ private var _remoteFetchTime = 0L
+ private var _fetchWaitTime = 0L
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ed53558..542deb9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -206,8 +206,9 @@ private[spark] class BlockManager(
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*
- * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid).
- * This ensures that update in master will compensate for the increase in memory on slave.
+ * droppedMemorySize exists to account for when block is dropped from memory to disk (so it
+ * is still valid). This ensures that update in master will compensate for the increase in
+ * memory on slave.
*/
def reportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L) {
val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize)
@@ -224,7 +225,8 @@ private[spark] class BlockManager(
* which will be true if the block was successfully recorded and false if
* the slave needs to re-register.
*/
- private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = {
+ private def tryToReportBlockStatus(blockId: BlockId, info: BlockInfo,
+ droppedMemorySize: Long = 0L): Boolean = {
val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
info.level match {
case null =>
@@ -282,14 +284,15 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- return diskStore.getBytes(blockId) match {
+ diskStore.getBytes(blockId) match {
case Some(bytes) =>
Some(bytes)
case None =>
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
+ } else {
+ doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
- doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
@@ -701,7 +704,8 @@ private[spark] class BlockManager(
diskStore.putBytes(blockId, bytes, level)
}
}
- val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
+ val droppedMemorySize =
+ if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockWasRemoved = memoryStore.remove(blockId)
if (!blockWasRemoved) {
logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 2c1a4e2..893418f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -61,8 +61,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
import context.dispatcher
- timeoutCheckingTask = context.system.scheduler.schedule(
- 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
+ checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
super.preStart()
}
@@ -169,8 +169,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
- (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 365866d..7cf754f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -57,9 +57,9 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
- .flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_ + _)
- .getOrElse(0L)
+ .flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_ + _)
+ .getOrElse(0L)
diskSpaceUsed / 1024 / 1024
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 5932936..5ded9ab 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -25,7 +25,8 @@ import org.apache.spark._
import org.apache.spark.network._
private[spark]
-class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
+class BlockMessageArray(var blockMessages: Seq[BlockMessage])
+ extends Seq[BlockMessage] with Logging {
def this(bm: BlockMessage) = this(Array(bm))
@@ -65,7 +66,8 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
buffer.position(buffer.position() + size)
}
val finishTime = System.currentTimeMillis
- logDebug("Converted block message array from buffer message in " + (finishTime - startTime) / 1000.0 + " s")
+ logDebug("Converted block message array from buffer message in " +
+ (finishTime - startTime) / 1000.0 + " s")
this.blockMessages = newBlockMessages
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 1720007..50a0cdb 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -25,15 +25,15 @@ private[spark]
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
blocks: Map[BlockId, BlockStatus]) {
- def memUsed() = blocks.values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+ def memUsed() = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
def memUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_+_).getOrElse(0L)
+ rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
- def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+ def diskUsed() = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
def diskUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_+_).getOrElse(0L)
+ rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
def memRemaining : Long = maxMem - memUsed()
@@ -48,8 +48,9 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
extends Ordered[RDDInfo] {
override def toString = {
import Utils.bytesToString
- "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
- storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(diskSize))
+ ("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
+ "DiskSize: %s").format(name, id, storageLevel.toString, numCachedPartitions,
+ numPartitions, bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
@@ -64,7 +65,8 @@ object StorageUtils {
/* Returns RDD-level information, compiled from a list of StorageStatus objects */
def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus],
sc: SparkContext) : Array[RDDInfo] = {
- rddInfoFromBlockStatusList(storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
+ rddInfoFromBlockStatusList(
+ storageStatusList.flatMap(_.rddBlocks).toMap[RDDBlockId, BlockStatus], sc)
}
/* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */
@@ -91,7 +93,8 @@ object StorageUtils {
sc.persistentRdds.get(rddId).map { r =>
val rddName = Option(r.name).getOrElse(rddId.toString)
val rddStorageLevel = r.getStorageLevel
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size,
+ memSize, diskSize)
}
}.flatten.toArray
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 5573b38..b95c8f4 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -48,14 +48,16 @@ private[spark] object UIUtils {
case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
}
val executors = page match {
- case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
+ case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a>
+ </li>
case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
}
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
+ type="text/css" />
<link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{sc.appName} - {title}</title>
@@ -63,7 +65,8 @@ private[spark] object UIUtils {
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
- <a href={prependBaseUri("/")} class="brand"><img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
+ <a href={prependBaseUri("/")} class="brand">
+ <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
<ul class="nav">
{jobs}
{storage}
@@ -93,7 +96,8 @@ private[spark] object UIUtils {
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")}
+ type="text/css" />
<link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{title}</title>
@@ -103,7 +107,8 @@ private[spark] object UIUtils {
<div class="row-fluid">
<div class="span12">
<h3 style="vertical-align: middle; display: inline-block;">
- <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} style="margin-right: 15px;" />
+ <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
+ style="margin-right: 15px;" />
{title}
</h3>
</div>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 6ba1518..f913ee4 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -36,7 +36,8 @@ private[spark] object UIWorkloadGenerator {
def main(args: Array[String]) {
if (args.length < 2) {
- println("usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ println(
+ "usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index a31a7e1..4e41acf 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -51,9 +51,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_+_)
- val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
+ val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
+ val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index ca5a286..6289f87 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -43,7 +43,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
}
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
- val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+ val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse,
+ parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
val pools = listener.sc.getAllPools
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index cfeeccd..9412a48 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -60,7 +60,10 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
case None => 0
}
<tr>
- <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a></td>
+ <td>
+ <a href=
+ {"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>
+ {p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index cfaf121..08107a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -64,7 +64,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
-
+ // scalastyle:off
val summary =
<div>
<ul class="unstyled">
@@ -96,7 +96,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
</ul>
</div>
-
+ // scalastyle:on
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
Seq("Duration", "GC Time", "Result Ser Time") ++
@@ -105,7 +105,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
Seq("Errors")
- val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+ val taskTable = listingTable(
+ taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -117,8 +118,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
else {
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.resultSerializationTime.toDouble}
- val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
- ms => parent.formatDuration(ms.toLong))
+ val serializationQuantiles =
+ "Result serialization time" +: Distribution(serializationTimes).
+ get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
@@ -225,7 +227,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
- val maybeShuffleWrite = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
+ val maybeShuffleWrite =
+ metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
@@ -236,7 +239,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
- val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
+ val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}
+ .getOrElse("")
val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3..01b6479 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -59,7 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
</table>
}
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+ private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int)
+ : Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 1df6b87..3eb0f08 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.Set
import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
+import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import org.apache.spark.Logging
private[spark] object ClosureCleaner extends Logging {
@@ -159,8 +159,9 @@ private[spark] object ClosureCleaner extends Logging {
// other than to set its fields, so use its constructor
val cons = cls.getConstructors()(0)
val params = cons.getParameterTypes.map(createNullValue).toArray
- if (outer != null)
+ if (outer != null) {
params(0) = outer // First param is always outer object
+ }
return cons.newInstance(params: _*).asInstanceOf[AnyRef]
} else {
// Use reflection to instantiate object without calling constructor
@@ -179,7 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
}
}
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
+private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]])
+ extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM4) {
@@ -221,11 +223,12 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi
val argTypes = Type.getArgumentTypes(desc)
if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
&& argTypes(0).toString.startsWith("L") // is it an object?
- && argTypes(0).getInternalName == myName)
+ && argTypes(0).getInternalName == myName) {
output += Class.forName(
owner.replace('/', '.'),
false,
Thread.currentThread.getContextClassLoader)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index fcc1ca9..b6a0998 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -21,7 +21,10 @@ package org.apache.spark.util
* Wrapper around an iterator which calls a completion method after it successfully iterates
* through all the elements.
*/
-private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+private[spark]
+// scalastyle:off
+abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] {
+// scalastyle:on
def next() = sub.next()
def hasNext = {
val r = sub.hasNext
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/Distribution.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala
index 33bf356..ab738c4 100644
--- a/core/src/main/scala/org/apache/spark/util/Distribution.scala
+++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util
import java.io.PrintStream
/**
- * Util for getting some stats from a small sample of numeric values, with some handy summary functions.
+ * Util for getting some stats from a small sample of numeric values, with some handy
+ * summary functions.
*
* Entirely in memory, not intended as a good way to compute stats over large data sets.
*
@@ -68,10 +69,11 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
object Distribution {
def apply(data: Traversable[Double]): Option[Distribution] = {
- if (data.size > 0)
+ if (data.size > 0) {
Some(new Distribution(data))
- else
+ } else {
None
+ }
}
def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index b0febe9..3868ab3 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -67,7 +67,8 @@ private[spark] object MetadataCleanerType extends Enumeration {
type MetadataCleanerType = Value
- def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
+ def systemProperty(which: MetadataCleanerType.MetadataCleanerType) =
+ "spark.cleaner.ttl." + which.toString
}
// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
index 8b4e7c1..2110b35 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableHyperLogLog.scala
@@ -21,7 +21,8 @@ import java.io.{Externalizable, ObjectOutput, ObjectInput}
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
/**
- * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
+ * A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is
+ * serializable.
*/
private[spark]
class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 3cf9489..5f86795 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -224,24 +224,26 @@ private[spark] object SizeEstimator extends Logging {
}
private def primitiveSize(cls: Class[_]): Long = {
- if (cls == classOf[Byte])
+ if (cls == classOf[Byte]) {
BYTE_SIZE
- else if (cls == classOf[Boolean])
+ } else if (cls == classOf[Boolean]) {
BOOLEAN_SIZE
- else if (cls == classOf[Char])
+ } else if (cls == classOf[Char]) {
CHAR_SIZE
- else if (cls == classOf[Short])
+ } else if (cls == classOf[Short]) {
SHORT_SIZE
- else if (cls == classOf[Int])
+ } else if (cls == classOf[Int]) {
INT_SIZE
- else if (cls == classOf[Long])
+ } else if (cls == classOf[Long]) {
LONG_SIZE
- else if (cls == classOf[Float])
+ } else if (cls == classOf[Float]) {
FLOAT_SIZE
- else if (cls == classOf[Double])
+ } else if (cls == classOf[Double]) {
DOUBLE_SIZE
- else throw new IllegalArgumentException(
+ } else {
+ throw new IllegalArgumentException(
"Non-primitive class " + cls + " passed to primitiveSize()")
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 020d5ed..5b0d2c3 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -20,7 +20,8 @@ package org.apache.spark.util
/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
* numerically robust way. Includes support for merging two StatCounters. Based on
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+ * Welford and Chan's algorithms for running variance]].
*
* @constructor Initialize the StatCounter with the given values.
*/
@@ -70,7 +71,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
n += other.n
}
- this
+ this
}
}
@@ -91,10 +92,11 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
/** Return the variance of the values. */
def variance: Double = {
- if (n == 0)
+ if (n == 0) {
Double.NaN
- else
+ } else {
m2 / n
+ }
}
/**
@@ -102,10 +104,11 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
* by N-1 instead of N.
*/
def sampleVariance: Double = {
- if (n <= 1)
+ if (n <= 1) {
Double.NaN
- else
+ } else {
m2 / (n - 1)
+ }
}
/** Return the standard deviation of the values. */
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 861ad62..c201d0a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -86,7 +86,8 @@ private[spark] object Utils extends Logging {
}
/** Serialize via nested stream using specific serializer */
- def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
+ def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(
+ f: SerializationStream => Unit) = {
val osWrapper = ser.serializeStream(new OutputStream {
def write(b: Int) = os.write(b)
@@ -100,7 +101,8 @@ private[spark] object Utils extends Logging {
}
/** Deserialize via nested stream using specific serializer */
- def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
+ def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(
+ f: DeserializationStream => Unit) = {
val isWrapper = ser.deserializeStream(new InputStream {
def read(): Int = is.read()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/Vector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index 83fa0bf..96da93d 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -26,24 +26,27 @@ class Vector(val elements: Array[Double]) extends Serializable {
def apply(index: Int) = elements(index)
def + (other: Vector): Vector = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
Vector(length, i => this(i) + other(i))
}
def add(other: Vector) = this + other
def - (other: Vector): Vector = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
Vector(length, i => this(i) - other(i))
}
def subtract(other: Vector) = this - other
def dot(other: Vector): Double = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
var ans = 0.0
var i = 0
while (i < length) {
@@ -60,10 +63,12 @@ class Vector(val elements: Array[Double]) extends Serializable {
* @return
*/
def plusDot(plus: Vector, other: Vector): Double = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
- if (length != plus.length)
+ }
+ if (length != plus.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
var ans = 0.0
var i = 0
while (i < length) {
@@ -74,8 +79,9 @@ class Vector(val elements: Array[Double]) extends Serializable {
}
def += (other: Vector): Vector = {
- if (length != other.length)
+ if (length != other.length) {
throw new IllegalArgumentException("Vectors of different length")
+ }
var i = 0
while (i < length) {
elements(i) += other(i)
@@ -131,7 +137,8 @@ object Vector {
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
*/
- def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble())
+ def random(length: Int, random: Random = new XORShiftRandom()) =
+ Vector(length, _ => random.nextDouble())
class Multiplier(num: Double) {
def * (vec: Vector) = vec * num
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 856eb77..c9cf512 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -123,7 +123,7 @@ class BitSet(numBits: Int) extends Serializable {
override def hasNext: Boolean = ind >= 0
override def next() = {
val tmp = ind
- ind = nextSetBit(ind+1)
+ ind = nextSetBit(ind + 1)
tmp
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 7eb300d..59ba1e4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -280,7 +280,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
/**
* Select a key with the minimum hash, then combine all values with the same key from all
- * input streams
+ * input streams.
*/
override def next(): (K, C) = {
// Select a key from the StreamBuffer that holds the lowest key hash
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 5ded5d0..148c12e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -187,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
override def hasNext: Boolean = pos != INVALID_POS
override def next(): T = {
val tmp = getValue(pos)
- pos = nextPos(pos+1)
+ pos = nextPos(pos + 1)
tmp
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 88f1cef..c2d84a8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -19,18 +19,21 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-
+// scalastyle:off
/**
- * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
+ * second.
* Usage: StatefulNetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/
+// scalastyle:on
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 3) {
@@ -50,8 +53,8 @@ object StatefulNetworkWordCount {
}
// Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
- System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+ val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey",
+ Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index a0094d4..c6215fd 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -23,20 +23,24 @@ import com.twitter.algebird.HyperLogLog._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
-
+// scalastyle:off
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
* a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
* <p>
* <p>
- * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This <a href= "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data
+ * -mining/">
* blog post</a> and this
- * <a href="http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">blog post</a>
- * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for estimating
- * the cardinality of a data stream, i.e. the number of unique elements.
+ * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
+ * blog post</a>
+ * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
+ * estimating the cardinality of a data stream, i.e. the number of unique elements.
* <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the reduce operation.
+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
+ * reduce operation.
*/
+// scalastyle:on
object TwitterAlgebirdHLL {
def main(args: Array[String]) {
if (args.length < 1) {
@@ -82,7 +86,8 @@ object TwitterAlgebirdHLL {
userSet ++= partial
println("Exact distinct users this batch: %d".format(partial.size))
println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100))
+ println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
+ ) * 100))
}
})
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index a260098..0ac46c3 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -36,6 +36,7 @@ object PageView extends Serializable {
}
}
+// scalastyle:off
/** Generates streaming events to simulate page views on a website.
*
* This should be used in tandem with PageViewStream.scala. Example:
@@ -44,7 +45,8 @@ object PageView extends Serializable {
*
* When running this, you may want to set the root logging level to ERROR in
* conf/log4j.properties to reduce the verbosity of the output.
- * */
+ */
+// scalastyle:on
object PageViewGenerator {
val pages = Map("http://foo.com/" -> .7,
"http://foo.com/news" -> 0.2,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index bb44bc3..2b130fb 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.examples.StreamingExamples
-
+// scalastyle:off
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
* operators available in Spark streaming.
*
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.examples.StreamingExamples
* $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
* $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*/
+// scalastyle:on
object PageViewStream {
def main(args: Array[String]) {
if (args.length != 3) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index a2cd49c..c2d9dcb 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -37,7 +37,8 @@ import org.apache.spark.streaming.dstream._
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
@@ -134,12 +135,15 @@ class KafkaReceiver[
}
}
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
- // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This
+ // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
- // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
- // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
+ // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
+ // 'smallest'/'largest':
+ // scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ // scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 15a2daa..5472d0c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -113,7 +113,8 @@ object KafkaUtils {
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b69f8b2a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 960c6a3..6acba25 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -34,8 +34,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
- Connect(publisherUrl), subscribe)
+ override def preStart() = ZeroMQExtension(context.system)
+ .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
def receive: Receive = {