You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2014/11/10 07:29:18 UTC

spark git commit: SPARK-3179. Add task OutputMetrics.

Repository: spark
Updated Branches:
  refs/heads/master f8e573230 -> 3c2cff4b9


SPARK-3179. Add task OutputMetrics.

Author: Sandy Ryza <sa...@cloudera.com>

This patch had conflicts when merged, resolved by
Committer: Kay Ousterhout <ka...@gmail.com>

Closes #2968 from sryza/sandy-spark-3179 and squashes the following commits:

dce4784 [Sandy Ryza] More review feedback
8d350d1 [Sandy Ryza] Fix test against Hadoop 2.5+
e7c74d0 [Sandy Ryza] More review feedback
6cff9c4 [Sandy Ryza] Review feedback
fb2dde0 [Sandy Ryza] SPARK-3179


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c2cff4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c2cff4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c2cff4b

Branch: refs/heads/master
Commit: 3c2cff4b9464f8d7535564fcd194631a8e5bb0a5
Parents: f8e5732
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Sun Nov 9 22:29:03 2014 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Sun Nov 9 22:29:03 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  46 +++++--
 .../org/apache/spark/executor/TaskMetrics.scala |  28 +++++
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  51 +++++++-
 .../org/apache/spark/scheduler/JobLogger.scala  |   7 +-
 .../scala/org/apache/spark/ui/ToolTips.scala    |   2 +
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |   5 +
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   3 +
 .../spark/ui/jobs/JobProgressListener.scala     |   6 +
 .../org/apache/spark/ui/jobs/StagePage.scala    |  29 ++++-
 .../org/apache/spark/ui/jobs/StageTable.scala   |   4 +
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   2 +
 .../org/apache/spark/util/JsonProtocol.scala    |  21 +++-
 .../spark/metrics/InputMetricsSuite.scala       |  76 ------------
 .../spark/metrics/InputOutputMetricsSuite.scala | 109 ++++++++++++++++
 .../spark/scheduler/SparkListenerSuite.scala    |   1 +
 .../ui/jobs/JobProgressListenerSuite.scala      |   7 ++
 .../apache/spark/util/JsonProtocolSuite.scala   | 124 +++++++++++++++++--
 17 files changed, 418 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e28eaad..60ee115 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy
 
+import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.conf.Configuration
@@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
    */
   private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
     : Option[() => Long] = {
-    val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
-    val scheme = qualifiedPath.toUri().getScheme()
-    val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
     try {
-      val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
-      val statisticsDataClass =
-        Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
-      val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
+      val threadStats = getFileSystemThreadStatistics(path, conf)
+      val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
       val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
       val baselineBytesRead = f()
       Some(() => f() - baselineBytesRead)
@@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
       }
     }
   }
+
+  /**
+   * Returns a function that can be called to find Hadoop FileSystem bytes written. If
+   * getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
+   * return the bytes written on r since t.  Reflection is required because thread-level FileSystem
+   * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
+   * Returns None if the required method can't be found.
+   */
+  private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
+    : Option[() => Long] = {
+    try {
+      val threadStats = getFileSystemThreadStatistics(path, conf)
+      val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
+      val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
+      val baselineBytesWritten = f()
+      Some(() => f() - baselineBytesWritten)
+    } catch {
+      case e: NoSuchMethodException => {
+        logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
+        None
+      }
+    }
+  }
+
+  private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
+    val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
+    val scheme = qualifiedPath.toUri().getScheme()
+    val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
+    stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
+  }
+
+  private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
+    val statisticsDataClass =
+      Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
+    statisticsDataClass.getDeclaredMethod(methodName)
+  }
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 57bc2b4..51b5328 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -83,6 +83,12 @@ class TaskMetrics extends Serializable {
   var inputMetrics: Option[InputMetrics] = None
 
   /**
+   * If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
+   * data was written are stored here.
+   */
+  var outputMetrics: Option[OutputMetrics] = None
+
+  /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
    * This includes read metrics aggregated over all the task's shuffle dependencies.
    */
@@ -159,6 +165,16 @@ object DataReadMethod extends Enumeration with Serializable {
 
 /**
  * :: DeveloperApi ::
+ * Method by which output data was written.
+ */
+@DeveloperApi
+object DataWriteMethod extends Enumeration with Serializable {
+  type DataWriteMethod = Value
+  val Hadoop = Value
+}
+
+/**
+ * :: DeveloperApi ::
  * Metrics about reading input data.
  */
 @DeveloperApi
@@ -171,6 +187,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
 
 /**
  * :: DeveloperApi ::
+ * Metrics about writing output data.
+ */
+@DeveloperApi
+case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+  /**
+   * Total bytes written
+   */
+  var bytesWritten: Long = 0L
+}
+
+/**
+ * :: DeveloperApi ::
  * Metrics pertaining to shuffle data read in a given task.
  */
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 462f0d6..8c2c959 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -28,7 +28,7 @@ import scala.reflect.ClassTag
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
@@ -40,6 +40,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
@@ -962,30 +963,40 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     }
 
     val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
+      val config = wrappedConf.value
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
       /* "reduce task" <split #> <attempt # = spark task #> */
       val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
         attemptNumber)
-      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+      val hadoopContext = newTaskAttemptContext(config, attemptId)
       val format = outfmt.newInstance
       format match {
-        case c: Configurable => c.setConf(wrappedConf.value)
+        case c: Configurable => c.setConf(config)
         case _ => ()
       }
       val committer = format.getOutputCommitter(hadoopContext)
       committer.setupTask(hadoopContext)
+
+      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+
       val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
       try {
+        var recordsWritten = 0L
         while (iter.hasNext) {
           val pair = iter.next()
           writer.write(pair._1, pair._2)
+
+          // Update bytes written metric every few records
+          maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
+          recordsWritten += 1
         }
       } finally {
         writer.close(hadoopContext)
       }
       committer.commitTask(hadoopContext)
+      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
       1
     } : Int
 
@@ -1006,6 +1017,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def saveAsHadoopDataset(conf: JobConf) {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
+    val wrappedConf = new SerializableWritable(hadoopConf)
     val outputFormatInstance = hadoopConf.getOutputFormat
     val keyClass = hadoopConf.getOutputKeyClass
     val valueClass = hadoopConf.getOutputValueClass
@@ -1033,27 +1045,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     writer.preSetup()
 
     val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
+      val config = wrappedConf.value
       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
       // around by taking a mod. We expect that no task will be attempted 2 billion times.
       val attemptNumber = (context.attemptId % Int.MaxValue).toInt
 
+      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context, config)
+
       writer.setup(context.stageId, context.partitionId, attemptNumber)
       writer.open()
       try {
+        var recordsWritten = 0L
         while (iter.hasNext) {
           val record = iter.next()
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+
+          // Update bytes written metric every few records
+          maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
+          recordsWritten += 1
         }
       } finally {
         writer.close()
       }
       writer.commit()
+      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
     }
 
     self.context.runJob(self, writeToFile)
     writer.commitJob()
   }
 
+  private def initHadoopOutputMetrics(context: TaskContext, config: Configuration)
+    : (OutputMetrics, Option[() => Long]) = {
+    val bytesWrittenCallback = Option(config.get("mapreduce.output.fileoutputformat.outputdir"))
+      .map(new Path(_))
+      .flatMap(SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(_, config))
+    val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+    if (bytesWrittenCallback.isDefined) {
+      context.taskMetrics.outputMetrics = Some(outputMetrics)
+    }
+    (outputMetrics, bytesWrittenCallback)
+  }
+
+  private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
+      outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
+    if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
+        && bytesWrittenCallback.isDefined) {
+      bytesWrittenCallback.foreach { fn => outputMetrics.bytesWritten = fn() }
+    }
+  }
+
   /**
    * Return an RDD with the keys of each tuple.
    */
@@ -1070,3 +1111,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
   private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
 }
+
+private[spark] object PairRDDFunctions {
+  val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 4e3d9de..3bb5485 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -158,6 +158,11 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
         " INPUT_BYTES=" + metrics.bytesRead
       case None => ""
     }
+    val outputMetrics = taskMetrics.outputMetrics match {
+      case Some(metrics) =>
+        " OUTPUT_BYTES=" + metrics.bytesWritten
+      case None => ""
+    }
     val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
       case Some(metrics) =>
         " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -173,7 +178,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
         " SHUFFLE_WRITE_TIME=" + metrics.shuffleWriteTime
       case None => ""
     }
-    stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics +
+    stageLogInfo(stageId, status + info + executorRunTime + gcTime + inputMetrics + outputMetrics +
       shuffleReadMetrics + writeMetrics)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 51dc08f..6f446c5 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -29,6 +29,8 @@ private[spark] object ToolTips {
 
   val INPUT = "Bytes read from Hadoop or from Spark storage."
 
+  val OUTPUT = "Bytes written to Hadoop."
+
   val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
 
   val SHUFFLE_READ =

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index ba97630..dd1c2b7 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,6 +48,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
   val executorToTasksFailed = HashMap[String, Int]()
   val executorToDuration = HashMap[String, Long]()
   val executorToInputBytes = HashMap[String, Long]()
+  val executorToOutputBytes = HashMap[String, Long]()
   val executorToShuffleRead = HashMap[String, Long]()
   val executorToShuffleWrite = HashMap[String, Long]()
 
@@ -78,6 +79,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
           executorToInputBytes(eid) =
             executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
         }
+        metrics.outputMetrics.foreach { outputMetrics =>
+          executorToOutputBytes(eid) =
+            executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
+        }
         metrics.shuffleReadMetrics.foreach { shuffleRead =>
           executorToShuffleRead(eid) =
             executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index f0e43fb..fa0f96b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -45,6 +45,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
         <th>Failed Tasks</th>
         <th>Succeeded Tasks</th>
         <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
+        <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
         <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
         <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th>
         <th>Shuffle Spill (Memory)</th>
@@ -77,6 +78,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
             <td>{v.succeededTasks}</td>
             <td sorttable_customkey={v.inputBytes.toString}>
               {Utils.bytesToString(v.inputBytes)}</td>
+            <td sorttable_customkey={v.outputBytes.toString}>
+              {Utils.bytesToString(v.outputBytes)}</td>
             <td sorttable_customkey={v.shuffleRead.toString}>
               {Utils.bytesToString(v.shuffleRead)}</td>
             <td sorttable_customkey={v.shuffleWrite.toString}>

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index e322340..8bbde51 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -259,6 +259,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     stageData.inputBytes += inputBytesDelta
     execSummary.inputBytes += inputBytesDelta
 
+    val outputBytesDelta =
+      (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L)
+        - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L))
+    stageData.outputBytes += outputBytesDelta
+    execSummary.outputBytes += outputBytesDelta
+
     val diskSpillDelta =
       taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
     stageData.diskBytesSpilled += diskSpillDelta

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 250bddb..16bc3f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -57,6 +57,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
       val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
       val hasAccumulators = accumulables.size > 0
       val hasInput = stageData.inputBytes > 0
+      val hasOutput = stageData.outputBytes > 0
       val hasShuffleRead = stageData.shuffleReadBytes > 0
       val hasShuffleWrite = stageData.shuffleWriteBytes > 0
       val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
@@ -74,6 +75,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
                 {Utils.bytesToString(stageData.inputBytes)}
               </li>
             }}
+            {if (hasOutput) {
+              <li>
+                <strong>Output: </strong>
+                {Utils.bytesToString(stageData.outputBytes)}
+              </li>
+            }}
             {if (hasShuffleRead) {
               <li>
                 <strong>Shuffle read: </strong>
@@ -162,6 +169,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
         {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
         {if (hasInput) Seq(("Input", "")) else Nil} ++
+        {if (hasOutput) Seq(("Output", "")) else Nil} ++
         {if (hasShuffleRead) Seq(("Shuffle Read", ""))  else Nil} ++
         {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++
         {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
@@ -172,7 +180,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
 
       val taskTable = UIUtils.listingTable(
         unzipped._1,
-        taskRow(hasAccumulators, hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled),
+        taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite,
+          hasBytesSpilled),
         tasks,
         headerClasses = unzipped._2)
       // Excludes tasks which failed and have incomplete metrics
@@ -260,6 +269,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           }
           val inputQuantiles = <td>Input</td> +: getFormattedSizeQuantiles(inputSizes)
 
+          val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
+          }
+          val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes)
+
           val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
           }
@@ -296,6 +310,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
             </tr>,
             <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
             if (hasInput) <tr>{inputQuantiles}</tr> else Nil,
+            if (hasOutput) <tr>{outputQuantiles}</tr> else Nil,
             if (hasShuffleRead) <tr>{shuffleReadQuantiles}</tr> else Nil,
             if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
             if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
@@ -328,6 +343,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
   def taskRow(
       hasAccumulators: Boolean,
       hasInput: Boolean,
+      hasOutput: Boolean,
       hasShuffleRead: Boolean,
       hasShuffleWrite: Boolean,
       hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
@@ -351,6 +367,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
         .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
         .getOrElse("")
 
+      val maybeOutput = metrics.flatMap(_.outputMetrics)
+      val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
+      val outputReadable = maybeOutput
+        .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
+        .getOrElse("")
+
       val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
       val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
       val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -417,6 +439,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
             {inputReadable}
           </td>
         }}
+        {if (hasOutput) {
+          <td sorttable_customkey={outputSortable}>
+            {outputReadable}
+          </td>
+        }}
         {if (hasShuffleRead) {
            <td sorttable_customkey={shuffleReadSortable}>
              {shuffleReadReadable}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 3b4866e..eae542d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -45,6 +45,7 @@ private[ui] class StageTableBase(
     <th>Duration</th>
     <th>Tasks: Succeeded/Total</th>
     <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
+    <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
     <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
     <th>
       <!-- Place the shuffle write tooltip on the left (rather than the default position
@@ -151,6 +152,8 @@ private[ui] class StageTableBase(
 
     val inputRead = stageData.inputBytes
     val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
+    val outputWrite = stageData.outputBytes
+    val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
     val shuffleRead = stageData.shuffleReadBytes
     val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
     val shuffleWrite = stageData.shuffleWriteBytes
@@ -179,6 +182,7 @@ private[ui] class StageTableBase(
         stageData.numFailedTasks, s.numTasks)}
     </td>
     <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
+    <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td>
     <td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
     <td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index e2813f8..2f7d618 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -31,6 +31,7 @@ private[jobs] object UIData {
     var failedTasks : Int = 0
     var succeededTasks : Int = 0
     var inputBytes : Long = 0
+    var outputBytes : Long = 0
     var shuffleRead : Long = 0
     var shuffleWrite : Long = 0
     var memoryBytesSpilled : Long = 0
@@ -53,6 +54,7 @@ private[jobs] object UIData {
     var executorRunTime: Long = _
 
     var inputBytes: Long = _
+    var outputBytes: Long = _
     var shuffleReadBytes: Long = _
     var shuffleWriteBytes: Long = _
     var memoryBytesSpilled: Long = _

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f15d0c8..7e536ed 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,9 +26,7 @@ import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
 
-
-import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
-  ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor._
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 import org.apache.spark._
@@ -232,6 +230,8 @@ private[spark] object JsonProtocol {
       taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
     val inputMetrics =
       taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
+    val outputMetrics =
+      taskMetrics.outputMetrics.map(outputMetricsToJson).getOrElse(JNothing)
     val updatedBlocks =
       taskMetrics.updatedBlocks.map { blocks =>
         JArray(blocks.toList.map { case (id, status) =>
@@ -250,6 +250,7 @@ private[spark] object JsonProtocol {
     ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
     ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
     ("Input Metrics" -> inputMetrics) ~
+    ("Output Metrics" -> outputMetrics) ~
     ("Updated Blocks" -> updatedBlocks)
   }
 
@@ -270,6 +271,11 @@ private[spark] object JsonProtocol {
     ("Bytes Read" -> inputMetrics.bytesRead)
   }
 
+  def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = {
+    ("Data Write Method" -> outputMetrics.writeMethod.toString) ~
+    ("Bytes Written" -> outputMetrics.bytesWritten)
+  }
+
   def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
     val reason = Utils.getFormattedClassName(taskEndReason)
     val json: JObject = taskEndReason match {
@@ -579,6 +585,8 @@ private[spark] object JsonProtocol {
       Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
     metrics.inputMetrics =
       Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
+    metrics.outputMetrics =
+      Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
     metrics.updatedBlocks =
       Utils.jsonOption(json \ "Updated Blocks").map { value =>
         value.extract[List[JValue]].map { block =>
@@ -613,6 +621,13 @@ private[spark] object JsonProtocol {
     metrics
   }
 
+  def outputMetricsFromJson(json: JValue): OutputMetrics = {
+    val metrics = new OutputMetrics(
+      DataWriteMethod.withName((json \ "Data Write Method").extract[String]))
+    metrics.bytesWritten = (json \ "Bytes Written").extract[Long]
+    metrics
+  }
+
   def taskEndReasonFromJson(json: JValue): TaskEndReason = {
     val success = Utils.getFormattedClassName(Success)
     val resubmitted = Utils.getFormattedClassName(Resubmitted)

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
deleted file mode 100644
index 48c386b..0000000
--- a/core/src/test/scala/org/apache/spark/metrics/InputMetricsSuite.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.metrics
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.SharedSparkContext
-import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
-
-import scala.collection.mutable.ArrayBuffer
-
-import java.io.{FileWriter, PrintWriter, File}
-
-class InputMetricsSuite extends FunSuite with SharedSparkContext {
-  test("input metrics when reading text file with single split") {
-    val file = new File(getClass.getSimpleName + ".txt")
-    val pw = new PrintWriter(new FileWriter(file))
-    pw.println("some stuff")
-    pw.println("some other stuff")
-    pw.println("yet more stuff")
-    pw.println("too much stuff")
-    pw.close()
-    file.deleteOnExit()
-
-    val taskBytesRead = new ArrayBuffer[Long]()
-    sc.addSparkListener(new SparkListener() {
-      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
-      }
-    })
-    sc.textFile("file://" + file.getAbsolutePath, 2).count()
-
-    // Wait for task end events to come in
-    sc.listenerBus.waitUntilEmpty(500)
-    assert(taskBytesRead.length == 2)
-    assert(taskBytesRead.sum >= file.length())
-  }
-
-  test("input metrics when reading text file with multiple splits") {
-    val file = new File(getClass.getSimpleName + ".txt")
-    val pw = new PrintWriter(new FileWriter(file))
-    for (i <- 0 until 10000) {
-      pw.println("some stuff")
-    }
-    pw.close()
-    file.deleteOnExit()
-
-    val taskBytesRead = new ArrayBuffer[Long]()
-    sc.addSparkListener(new SparkListener() {
-      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
-      }
-    })
-    sc.textFile("file://" + file.getAbsolutePath, 2).count()
-
-    // Wait for task end events to come in
-    sc.listenerBus.waitUntilEmpty(500)
-    assert(taskBytesRead.length == 2)
-    assert(taskBytesRead.sum >= file.length())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
new file mode 100644
index 0000000..ca226fd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics
+
+import java.io.{FileWriter, PrintWriter, File}
+
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+import scala.collection.mutable.ArrayBuffer
+
+class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with ShouldMatchers {
+  test("input metrics when reading text file with single split") {
+    val file = new File(getClass.getSimpleName + ".txt")
+    val pw = new PrintWriter(new FileWriter(file))
+    pw.println("some stuff")
+    pw.println("some other stuff")
+    pw.println("yet more stuff")
+    pw.println("too much stuff")
+    pw.close()
+    file.deleteOnExit()
+
+    val taskBytesRead = new ArrayBuffer[Long]()
+    sc.addSparkListener(new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+      }
+    })
+    sc.textFile("file://" + file.getAbsolutePath, 2).count()
+
+    // Wait for task end events to come in
+    sc.listenerBus.waitUntilEmpty(500)
+    assert(taskBytesRead.length == 2)
+    assert(taskBytesRead.sum >= file.length())
+  }
+
+  test("input metrics when reading text file with multiple splits") {
+    val file = new File(getClass.getSimpleName + ".txt")
+    val pw = new PrintWriter(new FileWriter(file))
+    for (i <- 0 until 10000) {
+      pw.println("some stuff")
+    }
+    pw.close()
+    file.deleteOnExit()
+
+    val taskBytesRead = new ArrayBuffer[Long]()
+    sc.addSparkListener(new SparkListener() {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+        taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
+      }
+    })
+    sc.textFile("file://" + file.getAbsolutePath, 2).count()
+
+    // Wait for task end events to come in
+    sc.listenerBus.waitUntilEmpty(500)
+    assert(taskBytesRead.length == 2)
+    assert(taskBytesRead.sum >= file.length())
+  }
+
+  test("output metrics when writing text file") {
+    val fs = FileSystem.getLocal(new Configuration())
+    val outPath = new Path(fs.getWorkingDirectory, "outdir")
+
+    if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, fs.getConf).isDefined) {
+      val taskBytesWritten = new ArrayBuffer[Long]()
+      sc.addSparkListener(new SparkListener() {
+        override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+          taskBytesWritten += taskEnd.taskMetrics.outputMetrics.get.bytesWritten
+        }
+      })
+
+      val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
+
+      try {
+        rdd.saveAsTextFile(outPath.toString)
+        sc.listenerBus.waitUntilEmpty(500)
+        assert(taskBytesWritten.length == 2)
+        val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
+        taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
+          assert(bytes >= fileStatus.getLen)
+        }
+      } finally {
+        fs.delete(outPath, true)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index ab35e8e..abe0dc3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -252,6 +252,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
         taskMetrics.resultSize should be > (0l)
         if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
           taskMetrics.inputMetrics should not be ('defined)
+          taskMetrics.outputMetrics should not be ('defined)
           taskMetrics.shuffleWriteMetrics should be ('defined)
           taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 2608ad4..7c102cc 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -159,6 +159,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
       taskMetrics.inputMetrics = Some(inputMetrics)
       inputMetrics.bytesRead = base + 7
+      val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+      taskMetrics.outputMetrics = Some(outputMetrics)
+      outputMetrics.bytesWritten = base + 8
       taskMetrics
     }
 
@@ -193,6 +196,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(stage1Data.memoryBytesSpilled == 206)
     assert(stage0Data.inputBytes == 114)
     assert(stage1Data.inputBytes == 207)
+    assert(stage0Data.outputBytes == 116)
+    assert(stage1Data.outputBytes == 208)
     assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 2)
     assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -221,6 +226,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(stage1Data.memoryBytesSpilled == 612)
     assert(stage0Data.inputBytes == 414)
     assert(stage1Data.inputBytes == 614)
+    assert(stage0Data.outputBytes == 416)
+    assert(stage1Data.outputBytes == 616)
     assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 302)
     assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get

http://git-wip-us.apache.org/repos/asf/spark/blob/3c2cff4b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 39e6985..50f4205 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -21,9 +21,6 @@ import java.util.Properties
 
 import scala.collection.Map
 
-import org.json4s.DefaultFormats
-import org.json4s.JsonDSL._
-import org.json4s.JsonAST._
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.FunSuite
 
@@ -43,10 +40,13 @@ class JsonProtocolSuite extends FunSuite {
       SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
     val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
-      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false, hasOutput = false))
     val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
-      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = false))
+    val taskEndWithOutput = SparkListenerTaskEnd(1, 0, "ResultTask", Success,
+      makeTaskInfo(123L, 234, 67, 345L, false),
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true))
     val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
     val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -69,6 +69,7 @@ class JsonProtocolSuite extends FunSuite {
     testEvent(taskGettingResult, taskGettingResultJsonString)
     testEvent(taskEnd, taskEndJsonString)
     testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
+    testEvent(taskEndWithOutput, taskEndWithOutputJsonString)
     testEvent(jobStart, jobStartJsonString)
     testEvent(jobEnd, jobEndJsonString)
     testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -83,7 +84,8 @@ class JsonProtocolSuite extends FunSuite {
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
-    testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
+    testTaskMetrics(makeTaskMetrics(
+      33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
 
     // StorageLevel
@@ -154,7 +156,7 @@ class JsonProtocolSuite extends FunSuite {
 
   test("InputMetrics backward compatibility") {
     // InputMetrics were added after 1.0.1.
-    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false)
     assert(metrics.inputMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
@@ -162,6 +164,16 @@ class JsonProtocolSuite extends FunSuite {
     assert(newMetrics.inputMetrics.isEmpty)
   }
 
+  test("OutputMetrics backward compatibility") {
+    // OutputMetrics were added after 1.1
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
+    assert(metrics.outputMetrics.nonEmpty)
+    val newJson = JsonProtocol.taskMetricsToJson(metrics)
+    val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" }
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.outputMetrics.isEmpty)
+  }
+
   test("BlockManager events backward compatibility") {
     // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
     val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
@@ -581,9 +593,9 @@ class JsonProtocolSuite extends FunSuite {
       d: Long,
       e: Int,
       f: Int,
-      hasHadoopInput: Boolean) = {
+      hasHadoopInput: Boolean,
+      hasOutput: Boolean) = {
     val t = new TaskMetrics
-    val sw = new ShuffleWriteMetrics
     t.hostname = "localhost"
     t.executorDeserializeTime = a
     t.executorRunTime = b
@@ -604,9 +616,16 @@ class JsonProtocolSuite extends FunSuite {
       sr.remoteBlocksFetched = f
       t.setShuffleReadMetrics(Some(sr))
     }
-    sw.shuffleBytesWritten = a + b + c
-    sw.shuffleWriteTime = b + c + d
-    t.shuffleWriteMetrics = Some(sw)
+    if (hasOutput) {
+      val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
+      outputMetrics.bytesWritten = a + b + c
+      t.outputMetrics = Some(outputMetrics)
+    } else {
+      val sw = new ShuffleWriteMetrics
+      sw.shuffleBytesWritten = a + b + c
+      sw.shuffleWriteTime = b + c + d
+      t.shuffleWriteMetrics = Some(sw)
+    }
     // Make at most 6 blocks
     t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
       (RDDBlockId(e % i, f % i), BlockStatus(StorageLevel.MEMORY_AND_DISK_SER_2, a % i, b % i, c%i))
@@ -946,6 +965,87 @@ class JsonProtocolSuite extends FunSuite {
       |}
     """
 
+  private val taskEndWithOutputJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerTaskEnd",
+      |  "Stage ID": 1,
+      |  "Stage Attempt ID": 0,
+      |  "Task Type": "ResultTask",
+      |  "Task End Reason": {
+      |    "Reason": "Success"
+      |  },
+      |  "Task Info": {
+      |    "Task ID": 123,
+      |    "Index": 234,
+      |    "Attempt": 67,
+      |    "Launch Time": 345,
+      |    "Executor ID": "executor",
+      |    "Host": "your kind sir",
+      |    "Locality": "NODE_LOCAL",
+      |    "Speculative": false,
+      |    "Getting Result Time": 0,
+      |    "Finish Time": 0,
+      |    "Failed": false,
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1"
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2"
+      |      },
+      |      {
+      |        "ID": 3,
+      |        "Name": "Accumulable3",
+      |        "Update": "delta3",
+      |        "Value": "val3"
+      |      }
+      |    ]
+      |  },
+      |  "Task Metrics": {
+      |    "Host Name": "localhost",
+      |    "Executor Deserialize Time": 300,
+      |    "Executor Run Time": 400,
+      |    "Result Size": 500,
+      |    "JVM GC Time": 600,
+      |    "Result Serialization Time": 700,
+      |    "Memory Bytes Spilled": 800,
+      |    "Disk Bytes Spilled": 0,
+      |    "Input Metrics": {
+      |      "Data Read Method": "Hadoop",
+      |      "Bytes Read": 2100
+      |    },
+      |    "Output Metrics": {
+      |      "Data Write Method": "Hadoop",
+      |      "Bytes Written": 1200
+      |    },
+      |    "Updated Blocks": [
+      |      {
+      |        "Block ID": "rdd_0_0",
+      |        "Status": {
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": false,
+      |            "Replication": 2
+      |          },
+      |          "Memory Size": 0,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 0
+      |        }
+      |      }
+      |    ]
+      |  }
+      |}
+    """
+
   private val jobStartJsonString =
     """
       |{


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org