You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/24 06:09:19 UTC

[2/9] git commit: Added StatsReportListener to generate processing time statistics across multiple batches.

Added StatsReportListener to generate processing time statistics across multiple batches.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b80ec056
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b80ec056
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b80ec056

Branch: refs/heads/master
Commit: b80ec05635132f96772545803a10a1bbfa1250e7
Parents: 097e120
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 18 15:35:24 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Dec 18 15:35:24 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListener.scala  |  5 +--
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 .../streaming/scheduler/StreamingListener.scala | 45 +++++++++++++++++++-
 3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b80ec056/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 2c5d874..ee63b3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
 
   def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
     val stats = d.statCounter
-    logInfo(heading + stats)
     val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+    logInfo(heading + stats)
     logInfo(percentilesHeader)
     logInfo("\t" + quantiles.mkString("\t"))
   }
@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
     showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
   }
 
-
-
   val seconds = 1000L
   val minutes = seconds * 60
   val hours = minutes * 60
@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
 }
 
 
-
 case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
 object RuntimePercentage {
   def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b80ec056/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 14906fd..69930f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     jobSet.afterJobStop(job)
     logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
     if (jobSet.hasCompleted) {
-      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
       jobSets.remove(jobSet.time)
       generator.onBatchCompletion(jobSet.time)
       logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
         jobSet.totalDelay / 1000.0, jobSet.time.toString,
         jobSet.processingDelay / 1000.0
       ))
+      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b80ec056/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 49fd0d2..5647ffa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -17,14 +17,22 @@
 
 package org.apache.spark.streaming.scheduler
 
+import scala.collection.mutable.Queue
+import org.apache.spark.util.Distribution
+
+/** Base trait for events related to StreamingListener */
 sealed trait StreamingListenerEvent
 
 case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
 case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
 
-trait StreamingListener {
 
+/**
+ * A listener interface for receiving information about an ongoing streaming
+ * computation.
+ */
+trait StreamingListener {
   /**
    * Called when processing of a batch has completed
    */
@@ -34,4 +42,39 @@ trait StreamingListener {
    * Called when processing of a batch has started
    */
   def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
+}
+
+
+/**
+ * A simple StreamingListener that logs summary statistics across Spark Streaming batches
+ * @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
+ */
+class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
+
+  import org.apache.spark
+
+  val batchInfos = new Queue[BatchInfo]()
+
+  override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
+    addToQueue(batchStarted.batchInfo)
+    printStats()
+  }
+
+  def addToQueue(newPoint: BatchInfo) {
+    batchInfos.enqueue(newPoint)
+    if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
+  }
+
+  def printStats() {
+    showMillisDistribution("Total delay: ", _.totalDelay)
+    showMillisDistribution("Processing time: ", _.processingDelay)
+  }
+
+  def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
+    spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
+  }
+
+  def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
+    Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
+  }
 }
\ No newline at end of file