You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/30 16:44:40 UTC
[1/3] git commit: Changed naming of StageCompleted event to be
consistent
Updated Branches:
refs/heads/master 72a17b69f -> 50e3b8ec4
Changed naming of StageCompleted event to be consistent
The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b4619e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b4619e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b4619e50
Branch: refs/heads/master
Commit: b4619e509bc3e06baa3b031ef2c1981d3bf02cbd
Parents: 19672dc
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 27 17:44:32 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 27 17:45:20 2013 -0800
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 2 +-
.../org/apache/spark/scheduler/JobLogger.scala | 2 +-
.../apache/spark/scheduler/SparkListener.scala | 22 +++++++++++++-------
.../spark/scheduler/SparkListenerBus.scala | 2 +-
.../spark/ui/jobs/JobProgressListener.scala | 2 +-
.../apache/spark/scheduler/JobLoggerSuite.scala | 2 +-
.../spark/scheduler/SparkListenerSuite.scala | 2 +-
7 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c48a3d6..7603eb2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -827,7 +827,7 @@ class DAGScheduler(
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
- listenerBus.post(StageCompleted(stageToInfos(stage)))
+ listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
running -= stage
}
event.reason match {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 6092783..7858080 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
* When stage is completed, record stage completion status
* @param stageCompleted Stage completed event
*/
- override def onStageCompleted(stageCompleted: StageCompleted) {
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
stageCompleted.stage.stageId))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index ee63b3c..3becb4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
extends SparkListenerEvents
-case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
@@ -47,7 +47,7 @@ trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
*/
- def onStageCompleted(stageCompleted: StageCompleted) { }
+ def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
/**
* Called when a stage is submitted
@@ -86,7 +86,7 @@ trait SparkListener {
* Simple SparkListener that logs a few summary statistics when each stage completes
*/
class StatsReportListener extends SparkListener with Logging {
- override def onStageCompleted(stageCompleted: StageCompleted) {
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
import org.apache.spark.scheduler.StatsReportListener._
implicit val sc = stageCompleted
this.logInfo("Finished stage: " + stageCompleted.stage)
@@ -119,13 +119,19 @@ object StatsReportListener extends Logging {
val probabilities = percentiles.map{_ / 100.0}
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
- def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
+ def extractDoubleDistribution(
+ stage:SparkListenerStageCompleted,
+ getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+ : Option[Distribution] = {
Distribution(stage.stage.taskInfos.flatMap {
case ((info,metric)) => getMetric(info, metric)})
}
//is there some way to setup the types that I can get rid of this completely?
- def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
+ def extractLongDistribution(
+ stage:SparkListenerStageCompleted,
+ getMetric: (TaskInfo,TaskMetrics) => Option[Long])
+ : Option[Distribution] = {
extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
}
@@ -147,12 +153,12 @@ object StatsReportListener extends Logging {
}
def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
- (implicit stage: StageCompleted) {
+ (implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
}
def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
- (implicit stage: StageCompleted) {
+ (implicit stage: SparkListenerStageCompleted) {
showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
}
@@ -169,7 +175,7 @@ object StatsReportListener extends Logging {
}
def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
- (implicit stage: StageCompleted) {
+ (implicit stage: SparkListenerStageCompleted) {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 85687ea..e7defd7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
- case stageCompleted: StageCompleted =>
+ case stageCompleted: SparkListenerStageCompleted =>
sparkListeners.foreach(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart =>
sparkListeners.foreach(_.onJobStart(jobStart))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2e51dd5..058bc2a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {}
- override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stage
poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
activeStages -= stage
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 002368f..d0bd20f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
- override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
}
sc.addSparkListener(joblogger)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index d4320e5..1a16e43 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
class SaveStageInfo extends SparkListener {
val stageInfos = Buffer[StageInfo]()
- override def onStageCompleted(stage: StageCompleted) {
+ override def onStageCompleted(stage: SparkListenerStageCompleted) {
stageInfos += stage.stage
}
}
[3/3] git commit: Merge pull request #308 from
kayousterhout/stage_naming
Posted by pw...@apache.org.
Merge pull request #308 from kayousterhout/stage_naming
Changed naming of StageCompleted event to be consistent
The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/50e3b8ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/50e3b8ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/50e3b8ec
Branch: refs/heads/master
Commit: 50e3b8ec4c8150f1cfc6b92f8871f520adf2cfda
Parents: 72a17b6 c2c1af3
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Dec 30 07:44:26 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Dec 30 07:44:26 2013 -0800
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 2 +-
.../org/apache/spark/scheduler/JobLogger.scala | 2 +-
.../apache/spark/scheduler/SparkListener.scala | 20 ++++++++++++--------
.../spark/scheduler/SparkListenerBus.scala | 2 +-
.../spark/ui/jobs/JobProgressListener.scala | 2 +-
.../apache/spark/scheduler/JobLoggerSuite.scala | 2 +-
.../spark/scheduler/SparkListenerSuite.scala | 2 +-
7 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/50e3b8ec/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
[2/3] git commit: Updated code style according to Patrick's comments
Posted by pw...@apache.org.
Updated code style according to Patrick's comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c2c1af39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c2c1af39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c2c1af39
Branch: refs/heads/master
Commit: c2c1af39f593cd00d29368efe2dbb8c0444f624d
Parents: b4619e5
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Sun Dec 29 21:10:08 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Sun Dec 29 21:10:08 2013 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/scheduler/SparkListener.scala | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c2c1af39/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3becb4f..627995c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -119,8 +119,7 @@ object StatsReportListener extends Logging {
val probabilities = percentiles.map{_ / 100.0}
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
- def extractDoubleDistribution(
- stage:SparkListenerStageCompleted,
+ def extractDoubleDistribution(stage: SparkListenerStageCompleted,
getMetric: (TaskInfo,TaskMetrics) => Option[Double])
: Option[Distribution] = {
Distribution(stage.stage.taskInfos.flatMap {
@@ -128,8 +127,7 @@ object StatsReportListener extends Logging {
}
//is there some way to setup the types that I can get rid of this completely?
- def extractLongDistribution(
- stage:SparkListenerStageCompleted,
+ def extractLongDistribution(stage: SparkListenerStageCompleted,
getMetric: (TaskInfo,TaskMetrics) => Option[Long])
: Option[Distribution] = {
extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})