You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/07/20 23:45:47 UTC

git commit: SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant

Repository: spark
Updated Branches:
  refs/heads/master 1b10b8114 -> 9564f8548


SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant

Author: Sandy Ryza <sa...@cloudera.com>

Closes #1474 from sryza/sandy-spark-2564 and squashes the following commits:

35b8388 [Sandy Ryza] Fix compile error on upmerge
7b985fb [Sandy Ryza] Fix test compile error
43f79e6 [Sandy Ryza] SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9564f854
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9564f854
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9564f854

Branch: refs/heads/master
Commit: 9564f8548917f563930d5e87911a304bf206d26e
Parents: 1b10b81
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Sun Jul 20 14:45:34 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Jul 20 14:45:34 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala  | 3 +--
 .../org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala | 1 -
 .../scala/org/apache/spark/storage/BlockFetcherIterator.scala    | 4 +---
 core/src/main/scala/org/apache/spark/util/JsonProtocol.scala     | 2 --
 .../src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 3 ---
 5 files changed, 2 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 5d59e00..21fe643 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -99,7 +99,6 @@ class TaskMetrics extends Serializable {
         existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
         existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
         existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
-        existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
         existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
       case None =>
         _shuffleReadMetrics = Some(newMetrics)
@@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable {
   /**
    * Number of blocks fetched in this shuffle by this task (remote or local)
    */
-  var totalBlocksFetched: Int = _
+  def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
 
   /**
    * Number of remote blocks fetched in this shuffle by this task

http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 3795994..9978882 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
       shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
       shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
       shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
-      shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
       shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
       shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
       context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 2f0296c..69905a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -46,7 +46,6 @@ import org.apache.spark.util.Utils
 private[storage]
 trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
   def initialize()
-  def totalBlocks: Int
   def numLocalBlocks: Int
   def numRemoteBlocks: Int
   def fetchWaitTime: Long
@@ -192,7 +191,7 @@ object BlockFetcherIterator {
         }
       }
       logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
-        totalBlocks + " blocks")
+        (numLocal + numRemote) + " blocks")
       remoteRequests
     }
 
@@ -235,7 +234,6 @@ object BlockFetcherIterator {
       logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
     }
 
-    override def totalBlocks: Int = numLocal + numRemote
     override def numLocalBlocks: Int = numLocal
     override def numRemoteBlocks: Int = numRemote
     override def fetchWaitTime: Long = _fetchWaitTime

http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2ff8b25..3448aaa 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -237,7 +237,6 @@ private[spark] object JsonProtocol {
 
   def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
     ("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
-    ("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
     ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
     ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
     ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
@@ -548,7 +547,6 @@ private[spark] object JsonProtocol {
   def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
     val metrics = new ShuffleReadMetrics
     metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
-    metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
     metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
     metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
     metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]

http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 11f70a6..9305b6d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -314,7 +314,6 @@ class JsonProtocolSuite extends FunSuite {
 
   private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: ShuffleReadMetrics) {
     assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime)
-    assert(metrics1.totalBlocksFetched === metrics2.totalBlocksFetched)
     assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched)
     assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched)
     assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime)
@@ -513,7 +512,6 @@ class JsonProtocolSuite extends FunSuite {
     } else {
       val sr = new ShuffleReadMetrics
       sr.shuffleFinishTime = b + c
-      sr.totalBlocksFetched = e + f
       sr.remoteBytesRead = b + d
       sr.localBlocksFetched = e
       sr.fetchWaitTime = a + d
@@ -584,7 +582,6 @@ class JsonProtocolSuite extends FunSuite {
       |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
       |  "Shuffle Read Metrics":{
       |    "Shuffle Finish Time":900,
-      |    "Total Blocks Fetched":1500,
       |    "Remote Blocks Fetched":800,
       |    "Local Blocks Fetched":700,
       |    "Fetch Wait Time":900,