You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2014/06/30 07:01:34 UTC

git commit: [SPARK-1683] Track task read metrics.

Repository: spark
Updated Branches:
  refs/heads/master cdf613fc5 -> 7b71a0e09


[SPARK-1683] Track task read metrics.

This commit adds a new metric in TaskMetrics to record
the input data size and displays this information in the UI.

An earlier version of this commit also added the read time,
which can be useful for diagnosing straggler problems,
but unfortunately that change introduced a significant performance
regression for jobs that don't do much computation. In order to
track read time, we'll need to do sampling.

The screenshots below show the UI with the new "Input" field,
which I added to the stage summary page, the executor summary page,
and the per-stage page.

![image](https://cloud.githubusercontent.com/assets/1108612/3167930/2627f92a-eb77-11e3-861c-98ea5bb7a1a2.png)

![image](https://cloud.githubusercontent.com/assets/1108612/3167936/475a889c-eb77-11e3-9706-f11c48751f17.png)

![image](https://cloud.githubusercontent.com/assets/1108612/3167948/80ebcf12-eb77-11e3-87ed-349fce6a770c.png)

Author: Kay Ousterhout <ka...@gmail.com>

Closes #962 from kayousterhout/read_metrics and squashes the following commits:

f13b67d [Kay Ousterhout] Correctly format input bytes on executor page
8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead
d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test
8461492 [Kay Ousterhout] Miniscule style fix
ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections
719f19d [Kay Ousterhout] Style fixes
bb6ec62 [Kay Ousterhout] Small fixes
869ac7b [Kay Ousterhout] Updated Json tests
44a0301 [Kay Ousterhout] Fixed accidentally added line
4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop.
f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase
bf41029 [Kay Ousterhout] Updated Json tests to pass
0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test
4e52925 [Kay Ousterhout] Added Json output and associated tests.
365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b71a0e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b71a0e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b71a0e0

Branch: refs/heads/master
Commit: 7b71a0e09622e09285a9884ebb67b5fb1c5caa53
Parents: cdf613f
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Sun Jun 29 22:01:42 2014 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Sun Jun 29 22:01:42 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   |  10 +-
 .../org/apache/spark/executor/TaskMetrics.scala |  29 ++++++
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |   2 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  15 +++
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  13 +++
 .../org/apache/spark/scheduler/JobLogger.scala  |  15 ++-
 .../org/apache/spark/storage/BlockManager.scala |  63 +++++++-----
 .../apache/spark/storage/ThreadingTest.scala    |   2 +-
 .../apache/spark/ui/exec/ExecutorsPage.scala    |   4 +
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |   5 +
 .../apache/spark/ui/jobs/ExecutorSummary.scala  |   1 +
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   2 +
 .../spark/ui/jobs/JobProgressListener.scala     |  15 ++-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  41 ++++++--
 .../org/apache/spark/ui/jobs/StageTable.scala   |   7 ++
 .../org/apache/spark/util/JsonProtocol.scala    |  20 +++-
 .../org/apache/spark/CacheManagerSuite.scala    |   4 +-
 .../spark/scheduler/SparkListenerSuite.scala    |   1 +
 .../spark/storage/BlockManagerSuite.scala       |  84 ++++++++++-----
 .../apache/spark/util/JsonProtocolSuite.scala   | 102 ++++++++++++++++---
 20 files changed, 349 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3f667a4..8f86768 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 
+import org.apache.spark.executor.InputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
@@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
     val key = RDDBlockId(rdd.id, partition.index)
     logDebug(s"Looking for partition $key")
     blockManager.get(key) match {
-      case Some(values) =>
+      case Some(blockResult) =>
         // Partition is already materialized, so just return its values
-        new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+        context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+        new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
 
       case None =>
         // Acquire a lock for loading this partition
@@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
           loading.add(id)
         }
-        values.map(_.asInstanceOf[Iterator[T]])
+        values.map(_.data.asInstanceOf[Iterator[T]])
       }
     }
   }
@@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
        * exceptions that can be avoided. */
       updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
       blockManager.get(key) match {
-        case Some(v) => v.asInstanceOf[Iterator[T]]
+        case Some(v) => v.data.asInstanceOf[Iterator[T]]
         case None =>
           logInfo(s"Failure to store $key")
           throw new BlockException(key, s"Block manager failed to return cached value for $key!")

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 350fd74..ac73288 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -67,6 +67,12 @@ class TaskMetrics extends Serializable {
   var diskBytesSpilled: Long = _
 
   /**
+   * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
+   * are stored here.
+   */
+  var inputMetrics: Option[InputMetrics] = None
+
+  /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    */
   var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
@@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
   def empty: TaskMetrics = new TaskMetrics
 }
 
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read.  Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+  type DataReadMethod = Value
+  val Memory, Disk, Hadoop, Network = Value
+}
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+  /**
+   * Total bytes read.
+   */
+  var bytesRead: Long = 0L
+}
+
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index c64da88..2673ec2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
     val blockManager = SparkEnv.get.blockManager
     val blockId = split.asInstanceOf[BlockRDDPartition].blockId
     blockManager.get(blockId) match {
-      case Some(block) => block.asInstanceOf[Iterator[T]]
+      case Some(block) => block.data.asInstanceOf[Iterator[T]]
       case None =>
         throw new Exception("Could not compute split, block " + blockId + " not found")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2aa111d..98dcbf4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 import org.apache.spark.util.NextIterator
 
 /**
@@ -196,6 +197,20 @@ class HadoopRDD[K, V](
       context.addOnCompleteCallback{ () => closeIfNeeded() }
       val key: K = reader.createKey()
       val value: V = reader.createValue()
+
+      // Set the task input metrics.
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      try {
+        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+         * always at record boundaries, so tasks may need to read into other splits to complete
+         * a record. */
+        inputMetrics.bytesRead = split.inputSplit.value.getLength()
+      } catch {
+        case e: java.io.IOException =>
+          logWarning("Unable to get input size to set InputMetrics for task", e)
+      }
+      context.taskMetrics.inputMetrics = Some(inputMetrics)
+
       override def getNext() = {
         try {
           finished = !reader.next(key, value)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index ac1ccc0..f2b3a64 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
 import org.apache.spark.Partition
 import org.apache.spark.SerializableWritable
 import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 
 private[spark] class NewHadoopPartition(
     rddId: Int,
@@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      try {
+        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+         * always at record boundaries, so tasks may need to read into other splits to complete
+         * a record. */
+        inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
+      } catch {
+        case e: Exception =>
+          logWarning("Unable to get input split size in order to set task input bytes", e)
+      }
+      context.taskMetrics.inputMetrics = Some(inputMetrics)
+
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback(() => close())
       var havePair = false

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index a1e21ca..47dd112 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
 
 /**
  * :: DeveloperApi ::
@@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
                " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
                " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
     val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
-    val readMetrics = taskMetrics.shuffleReadMetrics match {
+    val inputMetrics = taskMetrics.inputMetrics match {
+      case Some(metrics) =>
+        " READ_METHOD=" + metrics.readMethod.toString +
+        " INPUT_BYTES=" + metrics.bytesRead
+      case None => ""
+    }
+    val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
       case Some(metrics) =>
         " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
         " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
       case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
       case None => ""
     }
-    stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
+    stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
+      writeMetrics)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d2f7baf..0db0a5b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark._
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
@@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
 private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
 
+/* Class for returning a fetched block and associated metrics. */
+private[spark] class BlockResult(
+    val data: Iterator[Any],
+    readMethod: DataReadMethod.Value,
+    bytes: Long) {
+  val inputMetrics = new InputMetrics(readMethod)
+  inputMetrics.bytesRead = bytes
+}
+
 private[spark] class BlockManager(
     executorId: String,
     actorSystem: ActorSystem,
@@ -334,9 +344,9 @@ private[spark] class BlockManager(
   /**
    * Get block from local block manager.
    */
-  def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
+  def getLocal(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting local block $blockId")
-    doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+    doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
   }
 
   /**
@@ -355,11 +365,11 @@ private[spark] class BlockManager(
             blockId, s"Block $blockId not found on disk, though it should be")
       }
     } else {
-      doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+      doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
     }
   }
 
-  private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
+  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
     val info = blockInfo.get(blockId).orNull
     if (info != null) {
       info.synchronized {
@@ -386,14 +396,14 @@ private[spark] class BlockManager(
         // Look for the block in memory
         if (level.useMemory) {
           logDebug(s"Getting block $blockId from memory")
-          val result = if (asValues) {
-            memoryStore.getValues(blockId)
+          val result = if (asBlockResult) {
+            memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
           } else {
             memoryStore.getBytes(blockId)
           }
           result match {
             case Some(values) =>
-              return Some(values)
+              return result
             case None =>
               logDebug(s"Block $blockId not found in memory")
           }
@@ -405,10 +415,11 @@ private[spark] class BlockManager(
           if (tachyonStore.contains(blockId)) {
             tachyonStore.getBytes(blockId) match {
               case Some(bytes) =>
-                if (!asValues) {
+                if (!asBlockResult) {
                   return Some(bytes)
                 } else {
-                  return Some(dataDeserialize(blockId, bytes))
+                  return Some(new BlockResult(
+                    dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
                 }
               case None =>
                 logDebug(s"Block $blockId not found in tachyon")
@@ -429,14 +440,15 @@ private[spark] class BlockManager(
 
           if (!level.useMemory) {
             // If the block shouldn't be stored in memory, we can just return it
-            if (asValues) {
-              return Some(dataDeserialize(blockId, bytes))
+            if (asBlockResult) {
+              return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
+                info.size))
             } else {
               return Some(bytes)
             }
           } else {
             // Otherwise, we also have to store something in the memory store
-            if (!level.deserialized || !asValues) {
+            if (!level.deserialized || !asBlockResult) {
               /* We'll store the bytes in memory if the block's storage level includes
                * "memory serialized", or if it should be cached as objects in memory
                * but we only requested its serialized bytes. */
@@ -445,7 +457,7 @@ private[spark] class BlockManager(
               memoryStore.putBytes(blockId, copyForMemory, level)
               bytes.rewind()
             }
-            if (!asValues) {
+            if (!asBlockResult) {
               return Some(bytes)
             } else {
               val values = dataDeserialize(blockId, bytes)
@@ -457,12 +469,12 @@ private[spark] class BlockManager(
                 memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
                   match {
                     case Left(values2) =>
-                      return Some(values2)
+                      return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
                     case _ =>
-                      throw new SparkException("Memory store did not return an iterator")
+                      throw new SparkException("Memory store did not return back an iterator")
                   }
               } else {
-                return Some(values)
+                return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
               }
             }
           }
@@ -477,9 +489,9 @@ private[spark] class BlockManager(
   /**
    * Get block from remote block managers.
    */
-  def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
+  def getRemote(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting remote block $blockId")
-    doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+    doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
   }
 
   /**
@@ -487,10 +499,10 @@ private[spark] class BlockManager(
    */
   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
     logDebug(s"Getting remote block $blockId as bytes")
-    doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+    doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
   }
 
-  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
     require(blockId != null, "BlockId is null")
     val locations = Random.shuffle(master.getLocations(blockId))
     for (loc <- locations) {
@@ -498,8 +510,11 @@ private[spark] class BlockManager(
       val data = BlockManagerWorker.syncGetBlock(
         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
       if (data != null) {
-        if (asValues) {
-          return Some(dataDeserialize(blockId, data))
+        if (asBlockResult) {
+          return Some(new BlockResult(
+            dataDeserialize(blockId, data),
+            DataReadMethod.Network,
+            data.limit()))
         } else {
           return Some(data)
         }
@@ -513,7 +528,7 @@ private[spark] class BlockManager(
   /**
    * Get a block from the block manager (either local or remote).
    */
-  def get(blockId: BlockId): Option[Iterator[Any]] = {
+  def get(blockId: BlockId): Option[BlockResult] = {
     val local = getLocal(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
@@ -792,7 +807,7 @@ private[spark] class BlockManager(
    * Read a block consisting of a single object.
    */
   def getSingle(blockId: BlockId): Option[Any] = {
-    get(blockId).map(_.next())
+    get(blockId).map(_.data.next())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a107c51..328be15 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
         val startTime = System.currentTimeMillis()
         manager.get(blockId) match {
           case Some(retrievedBlock) =>
-            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+            assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
               "Block " + blockId + " did not match")
             println("Got block " + blockId + " in " +
               (System.currentTimeMillis - startTime) + " ms")

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 6cfc46c..9625337 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -72,6 +72,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
     "Complete Tasks",
     "Total Tasks",
     "Task Time",
+    "Input Bytes",
     "Shuffle Read",
     "Shuffle Write")
 
@@ -97,6 +98,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
       <td>{values("Complete Tasks")}</td>
       <td>{values("Total Tasks")}</td>
       <td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+      <td sorttable_customkey={values("Input Bytes")}>{Utils.bytesToString(values("Input Bytes").toLong)}</td>
       <td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
       <td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
     </tr>
@@ -119,6 +121,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
     val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
     val totalTasks = activeTasks + failedTasks + completedTasks
     val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+    val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0)
     val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
     val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
 
@@ -136,6 +139,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
       completedTasks,
       totalTasks,
       totalDuration,
+      totalInputBytes,
       totalShuffleRead,
       totalShuffleWrite,
       maxMem

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 91d37b8..58eeb86 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
   val executorToTasksComplete = HashMap[String, Int]()
   val executorToTasksFailed = HashMap[String, Int]()
   val executorToDuration = HashMap[String, Long]()
+  val executorToInputBytes = HashMap[String, Long]()
   val executorToShuffleRead = HashMap[String, Long]()
   val executorToShuffleWrite = HashMap[String, Long]()
 
@@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
       // Update shuffle read/write
       val metrics = taskEnd.taskMetrics
       if (metrics != null) {
+        metrics.inputMetrics.foreach { inputMetrics =>
+          executorToInputBytes(eid) =
+            executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+        }
         metrics.shuffleReadMetrics.foreach { shuffleRead =>
           executorToShuffleRead(eid) =
             executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 2aaf632..c4a8996 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -28,6 +28,7 @@ class ExecutorSummary {
   var taskTime : Long = 0
   var failedTasks : Int = 0
   var succeededTasks : Int = 0
+  var inputBytes: Long = 0
   var shuffleRead : Long = 0
   var shuffleWrite : Long = 0
   var memoryBytesSpilled : Long = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index add0e98..2a34a9a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
         <th>Total Tasks</th>
         <th>Failed Tasks</th>
         <th>Succeeded Tasks</th>
+        <th>Input Bytes</th>
         <th>Shuffle Read</th>
         <th>Shuffle Write</th>
         <th>Shuffle Spill (Memory)</th>
@@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
             <td>{v.failedTasks + v.succeededTasks}</td>
             <td>{v.failedTasks}</td>
             <td>{v.succeededTasks}</td>
+            <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
             <td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
             <td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
             <td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 381a544..2286a7f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
 
-  // Total metrics reflect metrics only for completed tasks
-  var totalTime = 0L
-  var totalShuffleRead = 0L
-  var totalShuffleWrite = 0L
-
   // TODO: Should probably consolidate all following into a single hash map.
   val stageIdToTime = HashMap[Int, Long]()
+  val stageIdToInputBytes = HashMap[Int, Long]()
   val stageIdToShuffleRead = HashMap[Int, Long]()
   val stageIdToShuffleWrite = HashMap[Int, Long]()
   val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
@@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
       val toRemove = math.max(retainedStages / 10, 1)
       stages.take(toRemove).foreach { s =>
         stageIdToTime.remove(s.stageId)
+        stageIdToInputBytes.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
         stageIdToShuffleWrite.remove(s.stageId)
         stageIdToMemoryBytesSpilled.remove(s.stageId)
@@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
 
           val metrics = taskEnd.taskMetrics
           if (metrics != null) {
+            metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
             metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
             metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
             y.memoryBytesSpilled += metrics.memoryBytesSpilled
@@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
       stageIdToTime.getOrElseUpdate(sid, 0L)
       val time = metrics.map(_.executorRunTime).getOrElse(0L)
       stageIdToTime(sid) += time
-      totalTime += time
+
+      stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+      val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
+      stageIdToInputBytes(sid) += inputBytes
 
       stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
       val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
       stageIdToShuffleRead(sid) += shuffleRead
-      totalShuffleRead += shuffleRead
 
       stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
       val shuffleWrite =
         metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
       stageIdToShuffleWrite(sid) += shuffleWrite
-      totalShuffleWrite += shuffleWrite
 
       stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
       val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8e3d5d1..afb8ed7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
       val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
 
       val numCompleted = tasks.count(_.taskInfo.finished)
+      val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
+      val hasInput = inputBytes > 0
       val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
       val hasShuffleRead = shuffleReadBytes > 0
       val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
@@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
               <strong>Total task time across all tasks: </strong>
               {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
             </li>
+            {if (hasInput)
+              <li>
+                <strong>Input: </strong>
+                {Utils.bytesToString(inputBytes)}
+              </li>
+            }
             {if (hasShuffleRead)
               <li>
                 <strong>Shuffle read: </strong>
@@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
         Seq(
           "Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
           "Launch Time", "Duration", "GC Time") ++
+        {if (hasInput) Seq("Input") else Nil} ++
         {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
         {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
         {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
         Seq("Errors")
 
       val taskTable = UIUtils.listingTable(
-        taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+        taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
 
       // Excludes tasks which failed and have incomplete metrics
       val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
@@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           def getQuantileCols(data: Seq[Double]) =
             Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
 
+          val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+          }
+          val inputQuantiles = "Input" +: getQuantileCols(inputSizes)
+
           val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
           }
@@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
             serviceQuantiles,
             gettingResultQuantiles,
             schedulerDelayQuantiles,
+            if (hasInput) inputQuantiles else Nil,
             if (hasShuffleRead) shuffleReadQuantiles else Nil,
             if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
             if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
@@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
     }
   }
 
-  def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
-      (taskData: TaskUIData): Seq[Node] = {
+  def taskRow(
+      hasInput: Boolean,
+      hasShuffleRead: Boolean,
+      hasShuffleWrite: Boolean,
+      hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
     taskData match { case TaskUIData(info, metrics, errorMessage) =>
       val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
         else metrics.map(_.executorRunTime).getOrElse(1L)
@@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
       val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
       val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
 
+      val maybeInput = metrics.flatMap(_.inputMetrics)
+      val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
+      val inputReadable = maybeInput
+        .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+        .getOrElse("")
+
       val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
       val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
       val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
         </td>
         -->
-        {if (shuffleRead) {
+        {if (hasInput) {
+          <td sorttable_customkey={inputSortable}>
+            {inputReadable}
+          </td>
+        }}
+        {if (hasShuffleRead) {
            <td sorttable_customkey={shuffleReadSortable}>
              {shuffleReadReadable}
            </td>
         }}
-        {if (shuffleWrite) {
+        {if (hasShuffleWrite) {
            <td sorttable_customkey={writeTimeSortable}>
              {writeTimeReadable}
            </td>
@@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
              {shuffleWriteReadable}
            </td>
         }}
-        {if (bytesSpilled) {
+        {if (hasBytesSpilled) {
           <td sorttable_customkey={memoryBytesSpilledSortable}>
             {memoryBytesSpilledReadable}
           </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index b2b6cc6..a9ac6d5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -43,6 +43,7 @@ private[ui] class StageTableBase(
     <th>Submitted</th>
     <th>Duration</th>
     <th>Tasks: Succeeded/Total</th>
+    <th>Input</th>
     <th>Shuffle Read</th>
     <th>Shuffle Write</th>
   }
@@ -123,6 +124,11 @@ private[ui] class StageTableBase(
       case _ => ""
     }
     val totalTasks = s.numTasks
+    val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
+    val inputRead = inputSortable match {
+      case 0 => ""
+      case b => Utils.bytesToString(b)
+    }
     val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
     val shuffleRead = shuffleReadSortable match {
       case 0 => ""
@@ -150,6 +156,7 @@ private[ui] class StageTableBase(
     <td class="progress-cell">
       {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
     </td>
+    <td sorttable_customekey={inputSortable.toString}>{inputRead}</td>
     <td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td>
     <td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td>
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6245b4b..26c9c9d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,7 +26,8 @@ import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
 
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
+  ShuffleWriteMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 import org.apache.spark._
@@ -213,6 +214,8 @@ private[spark] object JsonProtocol {
       taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
     val shuffleWriteMetrics =
       taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+    val inputMetrics =
+      taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
     val updatedBlocks =
       taskMetrics.updatedBlocks.map { blocks =>
         JArray(blocks.toList.map { case (id, status) =>
@@ -230,6 +233,7 @@ private[spark] object JsonProtocol {
     ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
     ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
     ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+    ("Input Metrics" -> inputMetrics) ~
     ("Updated Blocks" -> updatedBlocks)
   }
 
@@ -247,6 +251,11 @@ private[spark] object JsonProtocol {
     ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
   }
 
+  def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
+    ("Data Read Method" -> inputMetrics.readMethod.toString) ~
+    ("Bytes Read" -> inputMetrics.bytesRead)
+  }
+
   def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
     val reason = Utils.getFormattedClassName(taskEndReason)
     val json = taskEndReason match {
@@ -528,6 +537,8 @@ private[spark] object JsonProtocol {
       Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
     metrics.shuffleWriteMetrics =
       Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+    metrics.inputMetrics =
+      Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
     metrics.updatedBlocks =
       Utils.jsonOption(json \ "Updated Blocks").map { value =>
         value.extract[List[JValue]].map { block =>
@@ -557,6 +568,13 @@ private[spark] object JsonProtocol {
     metrics
   }
 
+  def inputMetricsFromJson(json: JValue): InputMetrics = {
+    val metrics = new InputMetrics(
+      DataReadMethod.withName((json \ "Data Read Method").extract[String]))
+    metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+    metrics
+  }
+
   def taskEndReasonFromJson(json: JValue): TaskEndReason = {
     val success = Utils.getFormattedClassName(Success)
     val resubmitted = Utils.getFormattedClassName(Resubmitted)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4f178db..7f5d0b0 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
@@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
 
   test("get cached rdd") {
     expecting {
-      blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+      val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+      blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
     }
 
     whenExecuting(blockManager) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6df0a08..71f48e2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
       taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0l)
         if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
+          taskMetrics.inputMetrics should not be ('defined)
           taskMetrics.shuffleWriteMetrics should be ('defined)
           taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d7dbe51..23cb690 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
 
+import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
@@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     }
   }
 
+  test("correct BlockResult returned from get() calls") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
+      mapOutputTracker)
+    val list1 = List(new Array[Byte](200), new Array[Byte](200))
+    val list1ForSizeEstimate = new ArrayBuffer[Any]
+    list1ForSizeEstimate ++= list1.iterator
+    val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
+    val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
+    val list2ForSizeEstimate = new ArrayBuffer[Any]
+    list2ForSizeEstimate ++= list2.iterator
+    val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    val list1Get = store.get("list1")
+    assert(list1Get.isDefined, "list1 expected to be in store")
+    assert(list1Get.get.data.size === 2)
+    assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
+    assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+    val list2MemoryGet = store.get("list2memory")
+    assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
+    assert(list2MemoryGet.get.data.size === 3)
+    assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
+    assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+    val list2DiskGet = store.get("list2disk")
+    assert(list2DiskGet.isDefined, "list2memory expected to be in store")
+    assert(list2DiskGet.get.data.size === 3)
+    System.out.println(list2DiskGet)
+    // We don't know the exact size of the data on disk, but it should certainly be > 0.
+    assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
+    assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+  }
+
   test("in-memory LRU storage") {
     store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
       securityMgr, mapOutputTracker)
@@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list2").get.size == 2)
+    assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3").isDefined, "list3 was not in store")
-    assert(store.get("list3").get.size == 2)
+    assert(store.get("list3").get.data.size === 2)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list2").get.size == 2)
+    assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list1").isDefined, "list1 was not in store")
-    assert(store.get("list1").get.size == 2)
+    assert(store.get("list1").get.data.size === 2)
     assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list2").get.size == 2)
+    assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3") === None, "list1 was in store")
   }
 
@@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
     store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
     store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    val listForSizeEstimate = new ArrayBuffer[Any]
+    listForSizeEstimate ++= list1.iterator
+    val listSize = SizeEstimator.estimate(listForSizeEstimate)
     // At this point LRU should not kick in because list3 is only on disk
-    assert(store.get("list1").isDefined, "list2 was not in store")
-    assert(store.get("list1").get.size === 2)
-    assert(store.get("list2").isDefined, "list3 was not in store")
-    assert(store.get("list2").get.size === 2)
-    assert(store.get("list3").isDefined, "list1 was not in store")
-    assert(store.get("list3").get.size === 2)
-    assert(store.get("list1").isDefined, "list2 was not in store")
-    assert(store.get("list1").get.size === 2)
-    assert(store.get("list2").isDefined, "list3 was not in store")
-    assert(store.get("list2").get.size === 2)
-    assert(store.get("list3").isDefined, "list1 was not in store")
-    assert(store.get("list3").get.size === 2)
+    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.get("list1").get.data.size === 2)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list2").get.data.size === 2)
+    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.get("list3").get.data.size === 2)
+    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.get("list1").get.data.size === 2)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list2").get.data.size === 2)
+    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
     store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list3 was not in store")
-    assert(store.get("list2").get.size === 2)
-    assert(store.get("list3").isDefined, "list1 was not in store")
-    assert(store.get("list3").get.size === 2)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list2").get.data.size === 2)
+    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.get("list3").get.data.size === 2)
     assert(store.get("list4").isDefined, "list4 was not in store")
-    assert(store.get("list4").get.size === 2)
+    assert(store.get("list4").get.data.size === 2)
   }
 
   test("negative byte values in ByteBufferInputStream") {

http://git-wip-us.apache.org/repos/asf/spark/blob/7b71a0e0/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6c49870..316e141 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
     val taskGettingResult =
       SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
     val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
-      makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+      makeTaskInfo(123L, 234, 67, 345L, false),
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+    val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+      makeTaskInfo(123L, 234, 67, 345L, false),
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
     val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
     val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
     testEvent(taskStart, taskStartJsonString)
     testEvent(taskGettingResult, taskGettingResultJsonString)
     testEvent(taskEnd, taskEndJsonString)
+    testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
     testEvent(jobStart, jobStartJsonString)
     testEvent(jobEnd, jobEndJsonString)
     testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
-    testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
+    testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
 
     // StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
     testBlockId(StreamBlockId(1, 2L))
   }
 
-  test("Backward compatibility") {
+  test("StageInfo.details backward compatibility") {
     // StageInfo.details was added after 1.0.0.
     val info = makeStageInfo(1, 2, 3, 4L, 5L)
     assert(info.details.nonEmpty)
@@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite {
     assert("" === newInfo.details)
   }
 
+  test("InputMetrics backward compatibility") {
+    // InputMetrics were added after 1.0.1.
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+    assert(metrics.inputMetrics.nonEmpty)
+    val newJson = JsonProtocol.taskMetricsToJson(metrics)
+    val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.inputMetrics.isEmpty)
+  }
+
 
   /** -------------------------- *
    | Helper test running methods |
@@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite {
       metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
     assertOptionEquals(
       metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
+    assertOptionEquals(
+      metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
     assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
   }
 
@@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite {
     assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
   }
 
+  private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
+    assert(metrics1.readMethod === metrics2.readMethod)
+    assert(metrics1.bytesRead === metrics2.bytesRead)
+  }
+
   private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
     assert(bm1.executorId === bm2.executorId)
     assert(bm1.host === bm2.host)
@@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite {
     assertEquals(w1, w2)
   }
 
+  private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
+    assertEquals(i1, i2)
+  }
+
   private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
     assertEquals(t1, t2)
   }
@@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite {
     new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
   }
 
-  private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
+  /**
+   * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
+   * set to true) or read data from a shuffle otherwise.
+   */
+  private def makeTaskMetrics(
+      a: Long,
+      b: Long,
+      c: Long,
+      d: Long,
+      e: Int,
+      f: Int,
+      hasHadoopInput: Boolean) = {
     val t = new TaskMetrics
-    val sr = new ShuffleReadMetrics
     val sw = new ShuffleWriteMetrics
     t.hostname = "localhost"
     t.executorDeserializeTime = a
@@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite {
     t.jvmGCTime = d
     t.resultSerializationTime = a + b
     t.memoryBytesSpilled = a + c
-    sr.shuffleFinishTime = b + c
-    sr.totalBlocksFetched = e + f
-    sr.remoteBytesRead = b + d
-    sr.localBlocksFetched = e
-    sr.fetchWaitTime = a + d
-    sr.remoteBlocksFetched = f
+
+    if (hasHadoopInput) {
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      inputMetrics.bytesRead = d + e + f
+      t.inputMetrics = Some(inputMetrics)
+    } else {
+      val sr = new ShuffleReadMetrics
+      sr.shuffleFinishTime = b + c
+      sr.totalBlocksFetched = e + f
+      sr.remoteBytesRead = b + d
+      sr.localBlocksFetched = e
+      sr.fetchWaitTime = a + d
+      sr.remoteBlocksFetched = f
+      t.shuffleReadMetrics = Some(sr)
+    }
     sw.shuffleBytesWritten = a + b + c
     sw.shuffleWriteTime = b + c + d
-    t.shuffleReadMetrics = Some(sr)
     t.shuffleWriteMetrics = Some(sw)
     // Make at most 6 blocks
     t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
@@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite {
       |  },
       |  "Shuffle Write Metrics":{
       |    "Shuffle Bytes Written":1200,
-      |    "Shuffle Write Time":1500},
-      |    "Updated Blocks":[
+      |    "Shuffle Write Time":1500
+      |  },
+      |  "Updated Blocks":[
       |    {"Block ID":"rdd_0_0",
       |      "Status":{
       |        "Storage Level":{
@@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite {
       |}
     """.stripMargin
 
+  private val taskEndWithHadoopInputJsonString =
+    """
+      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+      |"Task End Reason":{"Reason":"Success"},
+      |"Task Info":{
+      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+      |},
+      |"Task Metrics":{
+      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+      |  "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
+      |  "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
+      |  "Updated Blocks":[
+      |    {"Block ID":"rdd_0_0",
+      |      "Status":{
+      |        "Storage Level":{
+      |          "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+      |          "Replication":2
+      |        },
+      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |      }
+      |    }
+      |  ]}
+      |}
+    """
+
   private val jobStartJsonString =
     """
       {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":