You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/30 16:44:40 UTC

[1/3] git commit: Changed naming of StageCompleted event to be consistent

Updated Branches:
  refs/heads/master 72a17b69f -> 50e3b8ec4


Changed naming of StageCompleted event to be consistent

The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b4619e50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b4619e50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b4619e50

Branch: refs/heads/master
Commit: b4619e509bc3e06baa3b031ef2c1981d3bf02cbd
Parents: 19672dc
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri Dec 27 17:44:32 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Dec 27 17:45:20 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |  2 +-
 .../apache/spark/scheduler/SparkListener.scala  | 22 +++++++++++++-------
 .../spark/scheduler/SparkListenerBus.scala      |  2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  2 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  2 +-
 7 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c48a3d6..7603eb2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -827,7 +827,7 @@ class DAGScheduler(
       }
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
       stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
-      listenerBus.post(StageCompleted(stageToInfos(stage)))
+      listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
       running -= stage
     }
     event.reason match {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 6092783..7858080 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String)
    * When stage is completed, record stage completion status
    * @param stageCompleted Stage completed event
    */
-  override def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
         stageCompleted.stage.stageId))
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index ee63b3c..3becb4f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
 case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
      extends SparkListenerEvents
 
-case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
 
 case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
 
@@ -47,7 +47,7 @@ trait SparkListener {
   /**
    * Called when a stage is completed, with information on the completed stage
    */
-  def onStageCompleted(stageCompleted: StageCompleted) { }
+  def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
 
   /**
    * Called when a stage is submitted
@@ -86,7 +86,7 @@ trait SparkListener {
  * Simple SparkListener that logs a few summary statistics when each stage completes
  */
 class StatsReportListener extends SparkListener with Logging {
-  override def onStageCompleted(stageCompleted: StageCompleted) {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
     import org.apache.spark.scheduler.StatsReportListener._
     implicit val sc = stageCompleted
     this.logInfo("Finished stage: " + stageCompleted.stage)
@@ -119,13 +119,19 @@ object StatsReportListener extends Logging {
   val probabilities = percentiles.map{_ / 100.0}
   val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
 
-  def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
+  def extractDoubleDistribution(
+      stage:SparkListenerStageCompleted,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+    : Option[Distribution] = {
     Distribution(stage.stage.taskInfos.flatMap {
       case ((info,metric)) => getMetric(info, metric)})
   }
 
   //is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
+  def extractLongDistribution(
+      stage:SparkListenerStageCompleted,
+      getMetric: (TaskInfo,TaskMetrics) => Option[Long])
+    : Option[Distribution] = {
     extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
   }
 
@@ -147,12 +153,12 @@ object StatsReportListener extends Logging {
   }
 
   def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
   }
 
   def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
@@ -169,7 +175,7 @@ object StatsReportListener extends Logging {
   }
 
   def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
+    (implicit stage: SparkListenerStageCompleted) {
     showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 85687ea..e7defd7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging {
         event match {
           case stageSubmitted: SparkListenerStageSubmitted =>
             sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
-          case stageCompleted: StageCompleted =>
+          case stageCompleted: SparkListenerStageCompleted =>
             sparkListeners.foreach(_.onStageCompleted(stageCompleted))
           case jobStart: SparkListenerJobStart =>
             sparkListeners.foreach(_.onJobStart(jobStart))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2e51dd5..058bc2a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
 
   override def onJobStart(jobStart: SparkListenerJobStart) {}
 
-  override def onStageCompleted(stageCompleted: StageCompleted) = synchronized {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stage
     poolToActiveStages(stageIdToPool(stage.stageId)) -= stage
     activeStages -= stage

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
index 002368f..d0bd20f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala
@@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd)  = onTaskEndCount += 1
       override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
       override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
-      override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
+      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1
       override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
     }
     sc.addSparkListener(joblogger)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4619e50/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index d4320e5..1a16e43 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
 
   class SaveStageInfo extends SparkListener {
     val stageInfos = Buffer[StageInfo]()
-    override def onStageCompleted(stage: StageCompleted) {
+    override def onStageCompleted(stage: SparkListenerStageCompleted) {
       stageInfos += stage.stage
     }
   }


[3/3] git commit: Merge pull request #308 from kayousterhout/stage_naming

Posted by pw...@apache.org.
Merge pull request #308 from kayousterhout/stage_naming

Changed naming of StageCompleted event to be consistent

The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/50e3b8ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/50e3b8ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/50e3b8ec

Branch: refs/heads/master
Commit: 50e3b8ec4c8150f1cfc6b92f8871f520adf2cfda
Parents: 72a17b6 c2c1af3
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Dec 30 07:44:26 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Dec 30 07:44:26 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |  2 +-
 .../apache/spark/scheduler/SparkListener.scala  | 20 ++++++++++++--------
 .../spark/scheduler/SparkListenerBus.scala      |  2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  2 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  2 +-
 7 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/50e3b8ec/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------


[2/3] git commit: Updated code style according to Patrick's comments

Posted by pw...@apache.org.
Updated code style according to Patrick's comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c2c1af39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c2c1af39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c2c1af39

Branch: refs/heads/master
Commit: c2c1af39f593cd00d29368efe2dbb8c0444f624d
Parents: b4619e5
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Sun Dec 29 21:10:08 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Sun Dec 29 21:10:08 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/SparkListener.scala  | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c2c1af39/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3becb4f..627995c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -119,8 +119,7 @@ object StatsReportListener extends Logging {
   val probabilities = percentiles.map{_ / 100.0}
   val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
 
-  def extractDoubleDistribution(
-      stage:SparkListenerStageCompleted,
+  def extractDoubleDistribution(stage: SparkListenerStageCompleted,
       getMetric: (TaskInfo,TaskMetrics) => Option[Double])
     : Option[Distribution] = {
     Distribution(stage.stage.taskInfos.flatMap {
@@ -128,8 +127,7 @@ object StatsReportListener extends Logging {
   }
 
   //is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(
-      stage:SparkListenerStageCompleted,
+  def extractLongDistribution(stage: SparkListenerStageCompleted,
       getMetric: (TaskInfo,TaskMetrics) => Option[Long])
     : Option[Distribution] = {
     extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})