You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/04/23 06:42:13 UTC

spark git commit: [SPARK-7046] Remove InputMetrics from BlockResult

Repository: spark
Updated Branches:
  refs/heads/master d20686066 -> 03e85b4a1


[SPARK-7046] Remove InputMetrics from BlockResult

This is a code cleanup.

The BlockResult class originally contained an InputMetrics object so that InputMetrics could
directly be used as the InputMetrics for the whole task. Now we copy the fields out of here, and
the presence of this object is confusing because it's only a partial input metrics (it doesn't
include the records read). Because this object is no longer useful (and is confusing), it should
be removed.

Author: Kay Ousterhout <ka...@gmail.com>

Closes #5627 from kayousterhout/SPARK-7046 and squashes the following commits:

bf64bbe [Kay Ousterhout] Import fix
a08ca19 [Kay Ousterhout] [SPARK-7046] Remove InputMetrics from BlockResult


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03e85b4a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03e85b4a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03e85b4a

Branch: refs/heads/master
Commit: 03e85b4a11899f37424cd6e1f8d71f1d704c90bb
Parents: d206860
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Apr 22 21:42:09 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Apr 22 21:42:09 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/CacheManager.scala |  5 ++---
 .../scala/org/apache/spark/storage/BlockManager.scala   |  9 +++------
 .../org/apache/spark/storage/BlockManagerSuite.scala    | 12 ++++++------
 3 files changed, 11 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03e85b4a/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index a96d754..4d20c73 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
     blockManager.get(key) match {
       case Some(blockResult) =>
         // Partition is already materialized, so just return its values
-        val inputMetrics = blockResult.inputMetrics
         val existingMetrics = context.taskMetrics
-          .getInputMetricsForReadMethod(inputMetrics.readMethod)
-        existingMetrics.incBytesRead(inputMetrics.bytesRead)
+          .getInputMetricsForReadMethod(blockResult.readMethod)
+        existingMetrics.incBytesRead(blockResult.bytes)
 
         val iter = blockResult.data.asInstanceOf[Iterator[T]]
         new InterruptibleIterator[T](context, iter) {

http://git-wip-us.apache.org/repos/asf/spark/blob/03e85b4a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 145a9c1..55718e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import scala.util.Random
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark._
-import org.apache.spark.executor._
+import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
@@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
     val data: Iterator[Any],
-    readMethod: DataReadMethod.Value,
-    bytes: Long) {
-  val inputMetrics = new InputMetrics(readMethod)
-  inputMetrics.incBytesRead(bytes)
-}
+    val readMethod: DataReadMethod.Value,
+    val bytes: Long)
 
 /**
  * Manager running on every node (driver and executors) which provides interfaces for putting and

http://git-wip-us.apache.org/repos/asf/spark/blob/03e85b4a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 545722b..7d82a7c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -428,19 +428,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     val list1Get = store.get("list1")
     assert(list1Get.isDefined, "list1 expected to be in store")
     assert(list1Get.get.data.size === 2)
-    assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
-    assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+    assert(list1Get.get.bytes === list1SizeEstimate)
+    assert(list1Get.get.readMethod === DataReadMethod.Memory)
     val list2MemoryGet = store.get("list2memory")
     assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
     assert(list2MemoryGet.get.data.size === 3)
-    assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
-    assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+    assert(list2MemoryGet.get.bytes === list2SizeEstimate)
+    assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
     val list2DiskGet = store.get("list2disk")
     assert(list2DiskGet.isDefined, "list2memory expected to be in store")
     assert(list2DiskGet.get.data.size === 3)
     // We don't know the exact size of the data on disk, but it should certainly be > 0.
-    assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
-    assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+    assert(list2DiskGet.get.bytes > 0)
+    assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
   }
 
   test("in-memory LRU storage") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org