You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/04/04 22:26:33 UTC

spark git commit: [SPARK-14358] Change SparkListener from a trait to an abstract class

Repository: spark
Updated Branches:
  refs/heads/master 27dad6f65 -> 714390470


[SPARK-14358] Change SparkListener from a trait to an abstract class

## What changes were proposed in this pull request?
Scala traits are difficult to maintain binary compatibility on, and as a result we had to introduce JavaSparkListener. In Spark 2.0 we can change SparkListener from a trait to an abstract class and then remove JavaSparkListener.

## How was this patch tested?
Updated related unit tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #12142 from rxin/SPARK-14358.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71439047
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71439047
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71439047

Branch: refs/heads/master
Commit: 7143904700435265975d36f073cce2833467e121
Parents: 27dad6f
Author: Reynold Xin <rx...@databricks.com>
Authored: Mon Apr 4 13:26:18 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Apr 4 13:26:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/JavaSparkListener.java     |  88 -------
 .../org/apache/spark/SparkFirehoseListener.java |   2 +-
 .../org/apache/spark/HeartbeatReceiver.scala    |   2 +-
 .../apache/spark/scheduler/SparkListener.scala  | 251 +++++--------------
 .../spark/scheduler/StatsReportListener.scala   | 199 +++++++++++++++
 project/MimaExcludes.scala                      |  11 +-
 .../ui/StreamingJobProgressListener.scala       |   2 +-
 7 files changed, 276 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/core/src/main/java/org/apache/spark/JavaSparkListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
deleted file mode 100644
index 23bc9a2..0000000
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import org.apache.spark.scheduler.*;
-
-/**
- * Java clients should extend this class instead of implementing
- * SparkListener directly. This is to prevent java clients
- * from breaking when new events are added to the SparkListener
- * trait.
- *
- * This is a concrete class instead of abstract to enforce
- * new events get added to both the SparkListener and this adapter
- * in lockstep.
- */
-public class JavaSparkListener implements SparkListener {
-
-  @Override
-  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
-
-  @Override
-  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
-
-  @Override
-  public void onTaskStart(SparkListenerTaskStart taskStart) { }
-
-  @Override
-  public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
-
-  @Override
-  public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
-
-  @Override
-  public void onJobStart(SparkListenerJobStart jobStart) { }
-
-  @Override
-  public void onJobEnd(SparkListenerJobEnd jobEnd) { }
-
-  @Override
-  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
-
-  @Override
-  public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
-
-  @Override
-  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
-
-  @Override
-  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
-
-  @Override
-  public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
-
-  @Override
-  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
-
-  @Override
-  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
-
-  @Override
-  public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
-
-  @Override
-  public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
-
-  @Override
-  public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
-
-  @Override
-  public void onOtherEvent(SparkListenerEvent event) { }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index e6b24af..97eed61 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -28,7 +28,7 @@ import org.apache.spark.scheduler.*;
  * this was a concrete Scala class, default implementations of new event handlers would be inherited
  * from the SparkListener trait).
  */
-public class SparkFirehoseListener implements SparkListener {
+public class SparkFirehoseListener implements SparkListenerInterface {
 
     public void onEvent(SparkListenerEvent event) { }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 61f689e..2bdbd3f 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -56,7 +56,7 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
  * Lives in the driver to receive heartbeats from executors..
  */
 private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
-  extends ThreadSafeRpcEndpoint with SparkListener with Logging {
+  extends SparkListener with ThreadSafeRpcEndpoint with Logging {
 
   def this(sc: SparkContext) {
     this(sc, new SystemClock)

http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 586173f..080ea6c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -151,275 +151,152 @@ private[spark] trait SparkHistoryListenerFactory {
   def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
 }
 
+
 /**
- * :: DeveloperApi ::
- * Interface for listening to events from the Spark scheduler. Note that this is an internal
- * interface which might change in different Spark releases. Java clients should extend
- * {@link JavaSparkListener}
+ * Interface for listening to events from the Spark scheduler. Most applications should probably
+ * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
+ *
+ * Note that this is an internal interface which might change in different Spark releases.
  */
-@DeveloperApi
-trait SparkListener {
+private[spark] trait SparkListenerInterface {
+
   /**
    * Called when a stage completes successfully or fails, with information on the completed stage.
    */
-  def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
+  def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
 
   /**
    * Called when a stage is submitted
    */
-  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
+  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
 
   /**
    * Called when a task starts
    */
-  def onTaskStart(taskStart: SparkListenerTaskStart) { }
+  def onTaskStart(taskStart: SparkListenerTaskStart): Unit
 
   /**
    * Called when a task begins remotely fetching its result (will not be called for tasks that do
    * not need to fetch the result remotely).
    */
-  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit
 
   /**
    * Called when a task ends
    */
-  def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
+  def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
 
   /**
    * Called when a job starts
    */
-  def onJobStart(jobStart: SparkListenerJobStart) { }
+  def onJobStart(jobStart: SparkListenerJobStart): Unit
 
   /**
    * Called when a job ends
    */
-  def onJobEnd(jobEnd: SparkListenerJobEnd) { }
+  def onJobEnd(jobEnd: SparkListenerJobEnd): Unit
 
   /**
    * Called when environment properties have been updated
    */
-  def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
+  def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit
 
   /**
    * Called when a new block manager has joined
    */
-  def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
+  def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit
 
   /**
    * Called when an existing block manager has been removed
    */
-  def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
+  def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit
 
   /**
    * Called when an RDD is manually unpersisted by the application
    */
-  def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
+  def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit
 
   /**
    * Called when the application starts
    */
-  def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
+  def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit
 
   /**
    * Called when the application ends
    */
-  def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
+  def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit
 
   /**
    * Called when the driver receives task metrics from an executor in a heartbeat.
    */
-  def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
+  def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
 
   /**
    * Called when the driver registers a new executor.
    */
-  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
+  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
 
   /**
    * Called when the driver removes an executor.
    */
-  def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
+  def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
 
   /**
    * Called when the driver receives a block update info.
    */
-  def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
+  def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
    * Called when other events like SQL-specific events are posted.
    */
-  def onOtherEvent(event: SparkListenerEvent) { }
+  def onOtherEvent(event: SparkListenerEvent): Unit
 }
 
+
 /**
  * :: DeveloperApi ::
- * Simple SparkListener that logs a few summary statistics when each stage completes
+ * A default implementation for [[SparkListenerInterface]] that has no-op implementations for
+ * all callbacks.
+ *
+ * Note that this is an internal interface which might change in different Spark releases.
  */
 @DeveloperApi
-class StatsReportListener extends SparkListener with Logging {
-
-  import org.apache.spark.scheduler.StatsReportListener._
-
-  private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-    val info = taskEnd.taskInfo
-    val metrics = taskEnd.taskMetrics
-    if (info != null && metrics != null) {
-      taskInfoMetrics += ((info, metrics))
-    }
-  }
-
-  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
-    implicit val sc = stageCompleted
-    this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
-    showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
-
-    // Shuffle write
-    showBytesDistribution("shuffle bytes written:",
-      (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
-
-    // Fetch & I/O
-    showMillisDistribution("fetch wait time:",
-      (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
-    showBytesDistribution("remote bytes read:",
-      (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
-    showBytesDistribution("task result size:",
-      (_, metric) => Some(metric.resultSize), taskInfoMetrics)
-
-    // Runtime breakdown
-    val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
-      RuntimePercentage(info.duration, metrics)
-    }
-    showDistribution("executor (non-fetch) time pct: ",
-      Distribution(runtimePcts.map(_.executorPct * 100)), "%2.0f %%")
-    showDistribution("fetch wait time pct: ",
-      Distribution(runtimePcts.flatMap(_.fetchPct.map(_ * 100))), "%2.0f %%")
-    showDistribution("other time pct: ", Distribution(runtimePcts.map(_.other * 100)), "%2.0f %%")
-    taskInfoMetrics.clear()
-  }
-
-  private def getStatusDetail(info: StageInfo): String = {
-    val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("")
-    val timeTaken = info.submissionTime.map(
-      x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
-    ).getOrElse("-")
-
-    s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
-    s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
-    s"Took: $timeTaken msec"
-  }
+abstract class SparkListener extends SparkListenerInterface {
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
 
-}
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
 
-private[spark] object StatsReportListener extends Logging {
-
-  // For profiling, the extremes are more interesting
-  val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100)
-  val probabilities = percentiles.map(_ / 100.0)
-  val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
-
-  def extractDoubleDistribution(
-      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
-      getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
-    Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
-  }
-
-  // Is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(
-      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
-      getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
-    extractDoubleDistribution(
-      taskInfoMetrics,
-      (info, metric) => { getMetric(info, metric).map(_.toDouble) })
-  }
-
-  def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
-    val stats = d.statCounter
-    val quantiles = d.getQuantiles(probabilities).map(formatNumber)
-    logInfo(heading + stats)
-    logInfo(percentilesHeader)
-    logInfo("\t" + quantiles.mkString("\t"))
-  }
-
-  def showDistribution(
-      heading: String,
-      dOpt: Option[Distribution],
-      formatNumber: Double => String) {
-    dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
-  }
-
-  def showDistribution(heading: String, dOpt: Option[Distribution], format: String) {
-    def f(d: Double): String = format.format(d)
-    showDistribution(heading, dOpt, f _)
-  }
-
-  def showDistribution(
-      heading: String,
-      format: String,
-      getMetric: (TaskInfo, TaskMetrics) => Option[Double],
-      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
-    showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
-  }
-
-  def showBytesDistribution(
-      heading: String,
-      getMetric: (TaskInfo, TaskMetrics) => Option[Long],
-      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
-    showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
-  }
-
-  def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
-    dOpt.foreach { dist => showBytesDistribution(heading, dist) }
-  }
-
-  def showBytesDistribution(heading: String, dist: Distribution) {
-    showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
-  }
-
-  def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
-    showDistribution(heading, dOpt,
-      (d => StatsReportListener.millisToString(d.toLong)): Double => String)
-  }
-
-  def showMillisDistribution(
-      heading: String,
-      getMetric: (TaskInfo, TaskMetrics) => Option[Long],
-      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
-    showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
-  }
-
-  val seconds = 1000L
-  val minutes = seconds * 60
-  val hours = minutes * 60
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
 
-  /**
-   * Reformat a time interval in milliseconds to a prettier format for output
-   */
-  def millisToString(ms: Long): String = {
-    val (size, units) =
-      if (ms > hours) {
-        (ms.toDouble / hours, "hours")
-      } else if (ms > minutes) {
-        (ms.toDouble / minutes, "min")
-      } else if (ms > seconds) {
-        (ms.toDouble / seconds, "s")
-      } else {
-        (ms.toDouble, "ms")
-      }
-    "%.1f %s".format(size, units)
-  }
-}
+  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
+
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
+
+  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
+
+  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
+
+  override def onBlockManagerRemoved(
+      blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
+
+  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
+
+  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
+
+  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
+
+  override def onExecutorMetricsUpdate(
+      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
+
+  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
+
+  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
+
+  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
 
-private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-
-private object RuntimePercentage {
-  def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
-    val denom = totalTime.toDouble
-    val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
-    val fetch = fetchTime.map(_ / denom)
-    val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
-    val other = 1.0 - (exec + fetch.getOrElse(0d))
-    RuntimePercentage(exec, fetch, other)
-  }
+  override def onOtherEvent(event: SparkListenerEvent): Unit = { }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
new file mode 100644
index 0000000..309f4b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Distribution, Utils}
+
+
+/**
+ * :: DeveloperApi ::
+ * Simple SparkListener that logs a few summary statistics when each stage completes.
+ */
+@DeveloperApi
+class StatsReportListener extends SparkListener with Logging {
+
+  import org.apache.spark.scheduler.StatsReportListener._
+
+  private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+    val info = taskEnd.taskInfo
+    val metrics = taskEnd.taskMetrics
+    if (info != null && metrics != null) {
+      taskInfoMetrics += ((info, metrics))
+    }
+  }
+
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
+    implicit val sc = stageCompleted
+    this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
+    showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
+
+    // Shuffle write
+    showBytesDistribution("shuffle bytes written:",
+      (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
+
+    // Fetch & I/O
+    showMillisDistribution("fetch wait time:",
+      (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
+    showBytesDistribution("remote bytes read:",
+      (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
+    showBytesDistribution("task result size:",
+      (_, metric) => Some(metric.resultSize), taskInfoMetrics)
+
+    // Runtime breakdown
+    val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
+      RuntimePercentage(info.duration, metrics)
+    }
+    showDistribution("executor (non-fetch) time pct: ",
+      Distribution(runtimePcts.map(_.executorPct * 100)), "%2.0f %%")
+    showDistribution("fetch wait time pct: ",
+      Distribution(runtimePcts.flatMap(_.fetchPct.map(_ * 100))), "%2.0f %%")
+    showDistribution("other time pct: ", Distribution(runtimePcts.map(_.other * 100)), "%2.0f %%")
+    taskInfoMetrics.clear()
+  }
+
+  private def getStatusDetail(info: StageInfo): String = {
+    val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("")
+    val timeTaken = info.submissionTime.map(
+      x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
+    ).getOrElse("-")
+
+    s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
+      s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
+      s"Took: $timeTaken msec"
+  }
+
+}
+
+private[spark] object StatsReportListener extends Logging {
+
+  // For profiling, the extremes are more interesting
+  val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100)
+  val probabilities = percentiles.map(_ / 100.0)
+  val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
+
+  def extractDoubleDistribution(
+    taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+    getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
+    Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
+  }
+
+  // Is there some way to setup the types that I can get rid of this completely?
+  def extractLongDistribution(
+    taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+    getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
+    extractDoubleDistribution(
+      taskInfoMetrics,
+      (info, metric) => { getMetric(info, metric).map(_.toDouble) })
+  }
+
+  def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
+    val stats = d.statCounter
+    val quantiles = d.getQuantiles(probabilities).map(formatNumber)
+    logInfo(heading + stats)
+    logInfo(percentilesHeader)
+    logInfo("\t" + quantiles.mkString("\t"))
+  }
+
+  def showDistribution(
+    heading: String,
+    dOpt: Option[Distribution],
+    formatNumber: Double => String) {
+    dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
+  }
+
+  def showDistribution(heading: String, dOpt: Option[Distribution], format: String) {
+    def f(d: Double): String = format.format(d)
+    showDistribution(heading, dOpt, f _)
+  }
+
+  def showDistribution(
+    heading: String,
+    format: String,
+    getMetric: (TaskInfo, TaskMetrics) => Option[Double],
+    taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+    showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
+  }
+
+  def showBytesDistribution(
+    heading: String,
+    getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+    taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+    showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
+  }
+
+  def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
+    dOpt.foreach { dist => showBytesDistribution(heading, dist) }
+  }
+
+  def showBytesDistribution(heading: String, dist: Distribution) {
+    showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
+  }
+
+  def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
+    showDistribution(heading, dOpt,
+      (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+  }
+
+  def showMillisDistribution(
+    heading: String,
+    getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+    taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+    showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
+  }
+
+  val seconds = 1000L
+  val minutes = seconds * 60
+  val hours = minutes * 60
+
+  /**
+   * Reformat a time interval in milliseconds to a prettier format for output
+   */
+  def millisToString(ms: Long): String = {
+    val (size, units) =
+      if (ms > hours) {
+        (ms.toDouble / hours, "hours")
+      } else if (ms > minutes) {
+        (ms.toDouble / minutes, "min")
+      } else if (ms > seconds) {
+        (ms.toDouble / seconds, "s")
+      } else {
+        (ms.toDouble, "ms")
+      }
+    "%.1f %s".format(size, units)
+  }
+}
+
+private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
+
+private object RuntimePercentage {
+  def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
+    val denom = totalTime.toDouble
+    val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
+    val fetch = fetchTime.map(_ / denom)
+    val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
+    val other = 1.0 - (exec + fetch.getOrElse(0d))
+    RuntimePercentage(exec, fetch, other)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2be490b..9f245af 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -66,7 +66,16 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache")
       ) ++ Seq(
         ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),
+        // SPARK-14358 SparkListener from trait to abstract class
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.JavaSparkListener"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkFirehoseListener"),
+        ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.scheduler.SparkListener"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.jobs.JobProgressListener"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.exec.ExecutorsListener"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.env.EnvironmentListener"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.storage.StorageListener"),
+        ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.StorageStatusListener")
       ) ++
       Seq(
         // SPARK-3369 Fix Iterable/Iterator in Java API

http://git-wip-us.apache.org/repos/asf/spark/blob/71439047/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 6985c37..c086df4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.scheduler._
 
 private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
-  extends StreamingListener with SparkListener {
+  extends SparkListener with StreamingListener {
 
   private val waitingBatchUIData = new HashMap[Time, BatchUIData]
   private val runningBatchUIData = new HashMap[Time, BatchUIData]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org