You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/02/13 14:54:25 UTC

spark git commit: [SPARK-20659][CORE] Removing sc.getExecutorStorageStatus and making StorageStatus private

Repository: spark
Updated Branches:
  refs/heads/master 300c40f50 -> 116c581d2


[SPARK-20659][CORE] Removing sc.getExecutorStorageStatus and making StorageStatus private

## What changes were proposed in this pull request?

In this PR StorageStatus is made to private and simplified a bit moreover SparkContext.getExecutorStorageStatus method is removed. The reason of keeping StorageStatus is that it is usage from SparkContext.getRDDStorageInfo.

Instead of the method SparkContext.getExecutorStorageStatus executor infos are extended with additional memory metrics such as usedOnHeapStorageMemory, usedOffHeapStorageMemory, totalOnHeapStorageMemory, totalOffHeapStorageMemory.

## How was this patch tested?

By running existing unit tests.

Author: “attilapiros” <pi...@gmail.com>
Author: Attila Zsolt Piros <20...@users.noreply.github.com>

Closes #20546 from attilapiros/SPARK-20659.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/116c581d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/116c581d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/116c581d

Branch: refs/heads/master
Commit: 116c581d2658571d38f8b9b27a516ef517170589
Parents: 300c40f
Author: “attilapiros” <pi...@gmail.com>
Authored: Tue Feb 13 06:54:15 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Feb 13 06:54:15 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/SparkExecutorInfo.java     |   4 +
 .../scala/org/apache/spark/SparkContext.scala   |  19 +-
 .../org/apache/spark/SparkStatusTracker.scala   |   9 +-
 .../scala/org/apache/spark/StatusAPIImpl.scala  |   6 +-
 .../org/apache/spark/storage/StorageUtils.scala | 119 +---------
 .../org/apache/spark/DistributedSuite.scala     |   7 +-
 .../StandaloneDynamicAllocationSuite.scala      |   2 +-
 .../org/apache/spark/storage/StorageSuite.scala | 219 -------------------
 project/MimaExcludes.scala                      |  14 ++
 .../apache/spark/repl/SingletonReplSuite.scala  |   6 +-
 10 files changed, 44 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
index dc3e826..2b93385 100644
--- a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
+++ b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
@@ -30,4 +30,8 @@ public interface SparkExecutorInfo extends Serializable {
   int port();
   long cacheSize();
   int numRunningTasks();
+  long usedOnHeapStorageMemory();
+  long usedOffHeapStorageMemory();
+  long totalOnHeapStorageMemory();
+  long totalOffHeapStorageMemory();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3828d4f..c4f74c4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1715,7 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {
     assertNotStopped()
     val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray
-    StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
+    rddInfos.foreach { rddInfo =>
+      val rddId = rddInfo.id
+      val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))
+      rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)
+      rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)
+      rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)
+    }
     rddInfos.filter(_.isCached)
   }
 
@@ -1728,17 +1734,6 @@ class SparkContext(config: SparkConf) extends Logging {
 
   /**
    * :: DeveloperApi ::
-   * Return information about blocks stored in all of the slaves
-   */
-  @DeveloperApi
-  @deprecated("This method may change or be removed in a future release.", "2.2.0")
-  def getExecutorStorageStatus: Array[StorageStatus] = {
-    assertNotStopped()
-    env.blockManager.master.getStorageStatus
-  }
-
-  /**
-   * :: DeveloperApi ::
    * Return pools for fair scheduler
    */
   @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 70865cb..815237e 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -97,7 +97,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore
   }
 
   /**
-   * Returns information of all known executors, including host, port, cacheSize, numRunningTasks.
+   * Returns information of all known executors, including host, port, cacheSize, numRunningTasks
+   * and memory metrics.
    */
   def getExecutorInfos: Array[SparkExecutorInfo] = {
     store.executorList(true).map { exec =>
@@ -113,7 +114,11 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore
         host,
         port,
         cachedMem,
-        exec.activeTasks)
+        exec.activeTasks,
+        exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
+        exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L),
+        exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L),
+        exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
     }.toArray
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
index c1f24a6..6a888c1 100644
--- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -38,5 +38,9 @@ private class SparkExecutorInfoImpl(
     val host: String,
     val port: Int,
     val cacheSize: Long,
-    val numRunningTasks: Int)
+    val numRunningTasks: Int,
+    val usedOnHeapStorageMemory: Long,
+    val usedOffHeapStorageMemory: Long,
+    val totalOnHeapStorageMemory: Long,
+    val totalOffHeapStorageMemory: Long)
   extends SparkExecutorInfo

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index e9694fd..adc406b 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -24,19 +24,15 @@ import scala.collection.mutable
 
 import sun.nio.ch.DirectBuffer
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
 
 /**
- * :: DeveloperApi ::
  * Storage information for each BlockManager.
  *
  * This class assumes BlockId and BlockStatus are immutable, such that the consumers of this
  * class cannot mutate the source of the information. Accesses are not thread-safe.
  */
-@DeveloperApi
-@deprecated("This class may be removed or made private in a future release.", "2.2.0")
-class StorageStatus(
+private[spark] class StorageStatus(
     val blockManagerId: BlockManagerId,
     val maxMemory: Long,
     val maxOnHeapMem: Option[Long],
@@ -44,9 +40,6 @@ class StorageStatus(
 
   /**
    * Internal representation of the blocks stored in this block manager.
-   *
-   * We store RDD blocks and non-RDD blocks separately to allow quick retrievals of RDD blocks.
-   * These collections should only be mutated through the add/update/removeBlock methods.
    */
   private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
   private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
@@ -87,9 +80,6 @@ class StorageStatus(
    */
   def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) => blocks }
 
-  /** Return the blocks that belong to the given RDD stored in this block manager. */
-  def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = _rddBlocks.getOrElse(rddId, Map.empty)
-
   /** Add the given block to this storage status. If it already exists, overwrite it. */
   private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
     updateStorageInfo(blockId, blockStatus)
@@ -101,46 +91,6 @@ class StorageStatus(
     }
   }
 
-  /** Update the given block in this storage status. If it doesn't already exist, add it. */
-  private[spark] def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
-    addBlock(blockId, blockStatus)
-  }
-
-  /** Remove the given block from this storage status. */
-  private[spark] def removeBlock(blockId: BlockId): Option[BlockStatus] = {
-    updateStorageInfo(blockId, BlockStatus.empty)
-    blockId match {
-      case RDDBlockId(rddId, _) =>
-        // Actually remove the block, if it exists
-        if (_rddBlocks.contains(rddId)) {
-          val removed = _rddBlocks(rddId).remove(blockId)
-          // If the given RDD has no more blocks left, remove the RDD
-          if (_rddBlocks(rddId).isEmpty) {
-            _rddBlocks.remove(rddId)
-          }
-          removed
-        } else {
-          None
-        }
-      case _ =>
-        _nonRddBlocks.remove(blockId)
-    }
-  }
-
-  /**
-   * Return whether the given block is stored in this block manager in O(1) time.
-   *
-   * @note This is much faster than `this.blocks.contains`, which is O(blocks) time.
-   */
-  def containsBlock(blockId: BlockId): Boolean = {
-    blockId match {
-      case RDDBlockId(rddId, _) =>
-        _rddBlocks.get(rddId).exists(_.contains(blockId))
-      case _ =>
-        _nonRddBlocks.contains(blockId)
-    }
-  }
-
   /**
    * Return the given block stored in this block manager in O(1) time.
    *
@@ -155,37 +105,12 @@ class StorageStatus(
     }
   }
 
-  /**
-   * Return the number of blocks stored in this block manager in O(RDDs) time.
-   *
-   * @note This is much faster than `this.blocks.size`, which is O(blocks) time.
-   */
-  def numBlocks: Int = _nonRddBlocks.size + numRddBlocks
-
-  /**
-   * Return the number of RDD blocks stored in this block manager in O(RDDs) time.
-   *
-   * @note This is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
-   */
-  def numRddBlocks: Int = _rddBlocks.values.map(_.size).sum
-
-  /**
-   * Return the number of blocks that belong to the given RDD in O(1) time.
-   *
-   * @note This is much faster than `this.rddBlocksById(rddId).size`, which is
-   * O(blocks in this RDD) time.
-   */
-  def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
-
   /** Return the max memory can be used by this block manager. */
   def maxMem: Long = maxMemory
 
   /** Return the memory remaining in this block manager. */
   def memRemaining: Long = maxMem - memUsed
 
-  /** Return the memory used by caching RDDs */
-  def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L)
-
   /** Return the memory used by this block manager. */
   def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L)
 
@@ -220,15 +145,9 @@ class StorageStatus(
   /** Return the disk space used by this block manager. */
   def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
 
-  /** Return the memory used by the given RDD in this block manager in O(1) time. */
-  def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L)
-
   /** Return the disk space used by the given RDD in this block manager in O(1) time. */
   def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L)
 
-  /** Return the storage level, if any, used by the given RDD in this block manager. */
-  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level)
-
   /**
    * Update the relevant storage info, taking into account any existing status for this block.
    */
@@ -295,40 +214,4 @@ private[spark] object StorageUtils extends Logging {
       cleaner.clean()
     }
   }
-
-  /**
-   * Update the given list of RDDInfo with the given list of storage statuses.
-   * This method overwrites the old values stored in the RDDInfo's.
-   */
-  def updateRddInfo(rddInfos: Seq[RDDInfo], statuses: Seq[StorageStatus]): Unit = {
-    rddInfos.foreach { rddInfo =>
-      val rddId = rddInfo.id
-      // Assume all blocks belonging to the same RDD have the same storage level
-      val storageLevel = statuses
-        .flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE)
-      val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
-      val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
-      val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
-
-      rddInfo.storageLevel = storageLevel
-      rddInfo.numCachedPartitions = numCachedPartitions
-      rddInfo.memSize = memSize
-      rddInfo.diskSize = diskSize
-    }
-  }
-
-  /**
-   * Return a mapping from block ID to its locations for each block that belongs to the given RDD.
-   */
-  def getRddBlockLocations(rddId: Int, statuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
-    val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]]
-    statuses.foreach { status =>
-      status.rddBlocksById(rddId).foreach { case (bid, _) =>
-        val location = status.blockManagerId.hostPort
-        blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location
-      }
-    }
-    blockLocations
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index e09d5f5..28ea0c6 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -160,11 +160,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     val data = sc.parallelize(1 to 1000, 10)
     val cachedData = data.persist(storageLevel)
     assert(cachedData.count === 1000)
-    assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum ===
-      storageLevel.replication * data.getNumPartitions)
-    assert(cachedData.count === 1000)
-    assert(cachedData.count === 1000)
-
+    assert(sc.getRDDStorageInfo.filter(_.id == cachedData.id).map(_.numCachedPartitions).sum ===
+      data.getNumPartitions)
     // Get all the locations of the first partition and try to fetch the partitions
     // from those locations.
     val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index bf7480d..c21ee7d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -610,7 +610,7 @@ class StandaloneDynamicAllocationSuite
    * we submit a request to kill them. This must be called before each kill request.
    */
   private def syncExecutors(sc: SparkContext): Unit = {
-    val driverExecutors = sc.getExecutorStorageStatus
+    val driverExecutors = sc.env.blockManager.master.getStorageStatus
       .map(_.blockManagerId.executorId)
       .filter { _ != SparkContext.DRIVER_IDENTIFIER}
     val masterExecutors = getExecutorIds(sc)

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index da198f9..ca35238 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -51,27 +51,6 @@ class StorageSuite extends SparkFunSuite {
     assert(status.diskUsed === 60L)
   }
 
-  test("storage status update non-RDD blocks") {
-    val status = storageStatus1
-    status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L))
-    status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L))
-    assert(status.blocks.size === 3)
-    assert(status.memUsed === 160L)
-    assert(status.memRemaining === 840L)
-    assert(status.diskUsed === 140L)
-  }
-
-  test("storage status remove non-RDD blocks") {
-    val status = storageStatus1
-    status.removeBlock(TestBlockId("foo"))
-    status.removeBlock(TestBlockId("faa"))
-    assert(status.blocks.size === 1)
-    assert(status.blocks.contains(TestBlockId("fee")))
-    assert(status.memUsed === 10L)
-    assert(status.memRemaining === 990L)
-    assert(status.diskUsed === 20L)
-  }
-
   // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
   private def storageStatus2: StorageStatus = {
     val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
@@ -95,85 +74,6 @@ class StorageSuite extends SparkFunSuite {
     assert(status.rddBlocks.contains(RDDBlockId(2, 2)))
     assert(status.rddBlocks.contains(RDDBlockId(2, 3)))
     assert(status.rddBlocks.contains(RDDBlockId(2, 4)))
-    assert(status.rddBlocksById(0).size === 1)
-    assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0)))
-    assert(status.rddBlocksById(1).size === 1)
-    assert(status.rddBlocksById(1).contains(RDDBlockId(1, 1)))
-    assert(status.rddBlocksById(2).size === 3)
-    assert(status.rddBlocksById(2).contains(RDDBlockId(2, 2)))
-    assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3)))
-    assert(status.rddBlocksById(2).contains(RDDBlockId(2, 4)))
-    assert(status.memUsedByRdd(0) === 10L)
-    assert(status.memUsedByRdd(1) === 100L)
-    assert(status.memUsedByRdd(2) === 30L)
-    assert(status.diskUsedByRdd(0) === 20L)
-    assert(status.diskUsedByRdd(1) === 200L)
-    assert(status.diskUsedByRdd(2) === 80L)
-    assert(status.rddStorageLevel(0) === Some(memAndDisk))
-    assert(status.rddStorageLevel(1) === Some(memAndDisk))
-    assert(status.rddStorageLevel(2) === Some(memAndDisk))
-
-    // Verify default values for RDDs that don't exist
-    assert(status.rddBlocksById(10).isEmpty)
-    assert(status.memUsedByRdd(10) === 0L)
-    assert(status.diskUsedByRdd(10) === 0L)
-    assert(status.rddStorageLevel(10) === None)
-  }
-
-  test("storage status update RDD blocks") {
-    val status = storageStatus2
-    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L))
-    status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L))
-    status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L))
-    assert(status.blocks.size === 7)
-    assert(status.rddBlocks.size === 5)
-    assert(status.rddBlocksById(0).size === 1)
-    assert(status.rddBlocksById(1).size === 1)
-    assert(status.rddBlocksById(2).size === 3)
-    assert(status.memUsedByRdd(0) === 0L)
-    assert(status.memUsedByRdd(1) === 100L)
-    assert(status.memUsedByRdd(2) === 20L)
-    assert(status.diskUsedByRdd(0) === 0L)
-    assert(status.diskUsedByRdd(1) === 200L)
-    assert(status.diskUsedByRdd(2) === 1060L)
-  }
-
-  test("storage status remove RDD blocks") {
-    val status = storageStatus2
-    status.removeBlock(TestBlockId("man"))
-    status.removeBlock(RDDBlockId(1, 1))
-    status.removeBlock(RDDBlockId(2, 2))
-    status.removeBlock(RDDBlockId(2, 4))
-    assert(status.blocks.size === 3)
-    assert(status.rddBlocks.size === 2)
-    assert(status.rddBlocks.contains(RDDBlockId(0, 0)))
-    assert(status.rddBlocks.contains(RDDBlockId(2, 3)))
-    assert(status.rddBlocksById(0).size === 1)
-    assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0)))
-    assert(status.rddBlocksById(1).size === 0)
-    assert(status.rddBlocksById(2).size === 1)
-    assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3)))
-    assert(status.memUsedByRdd(0) === 10L)
-    assert(status.memUsedByRdd(1) === 0L)
-    assert(status.memUsedByRdd(2) === 10L)
-    assert(status.diskUsedByRdd(0) === 20L)
-    assert(status.diskUsedByRdd(1) === 0L)
-    assert(status.diskUsedByRdd(2) === 20L)
-  }
-
-  test("storage status containsBlock") {
-    val status = storageStatus2
-    // blocks that actually exist
-    assert(status.blocks.contains(TestBlockId("dan")) === status.containsBlock(TestBlockId("dan")))
-    assert(status.blocks.contains(TestBlockId("man")) === status.containsBlock(TestBlockId("man")))
-    assert(status.blocks.contains(RDDBlockId(0, 0)) === status.containsBlock(RDDBlockId(0, 0)))
-    assert(status.blocks.contains(RDDBlockId(1, 1)) === status.containsBlock(RDDBlockId(1, 1)))
-    assert(status.blocks.contains(RDDBlockId(2, 2)) === status.containsBlock(RDDBlockId(2, 2)))
-    assert(status.blocks.contains(RDDBlockId(2, 3)) === status.containsBlock(RDDBlockId(2, 3)))
-    assert(status.blocks.contains(RDDBlockId(2, 4)) === status.containsBlock(RDDBlockId(2, 4)))
-    // blocks that don't exist
-    assert(status.blocks.contains(TestBlockId("fan")) === status.containsBlock(TestBlockId("fan")))
-    assert(status.blocks.contains(RDDBlockId(100, 0)) === status.containsBlock(RDDBlockId(100, 0)))
   }
 
   test("storage status getBlock") {
@@ -191,40 +91,6 @@ class StorageSuite extends SparkFunSuite {
     assert(status.blocks.get(RDDBlockId(100, 0)) === status.getBlock(RDDBlockId(100, 0)))
   }
 
-  test("storage status num[Rdd]Blocks") {
-    val status = storageStatus2
-    assert(status.blocks.size === status.numBlocks)
-    assert(status.rddBlocks.size === status.numRddBlocks)
-    status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L))
-    status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L))
-    status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L))
-    assert(status.blocks.size === status.numBlocks)
-    assert(status.rddBlocks.size === status.numRddBlocks)
-    assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
-    assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
-    status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L))
-    status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L))
-    status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L))
-    status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L))
-    assert(status.blocks.size === status.numBlocks)
-    assert(status.rddBlocks.size === status.numRddBlocks)
-    assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
-    assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
-    assert(status.rddBlocksById(100).size === status.numRddBlocksById(100))
-    status.removeBlock(RDDBlockId(4, 0))
-    status.removeBlock(RDDBlockId(10, 10))
-    assert(status.blocks.size === status.numBlocks)
-    assert(status.rddBlocks.size === status.numRddBlocks)
-    assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
-    assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
-    // remove a block that doesn't exist
-    status.removeBlock(RDDBlockId(1000, 999))
-    assert(status.blocks.size === status.numBlocks)
-    assert(status.rddBlocks.size === status.numRddBlocks)
-    assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
-    assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
-    assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
-  }
 
   test("storage status memUsed, diskUsed, externalBlockStoreUsed") {
     val status = storageStatus2
@@ -237,17 +103,6 @@ class StorageSuite extends SparkFunSuite {
     status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
-    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
-    status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L))
-    status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L))
-    assert(status.memUsed === actualMemUsed)
-    assert(status.diskUsed === actualDiskUsed)
-    status.removeBlock(TestBlockId("fire"))
-    status.removeBlock(TestBlockId("man"))
-    status.removeBlock(RDDBlockId(2, 2))
-    status.removeBlock(RDDBlockId(2, 3))
-    assert(status.memUsed === actualMemUsed)
-    assert(status.diskUsed === actualDiskUsed)
   }
 
   // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
@@ -273,65 +128,6 @@ class StorageSuite extends SparkFunSuite {
     Seq(info0, info1)
   }
 
-  test("StorageUtils.updateRddInfo") {
-    val storageStatuses = stockStorageStatuses
-    val rddInfos = stockRDDInfos
-    StorageUtils.updateRddInfo(rddInfos, storageStatuses)
-    assert(rddInfos(0).storageLevel === memAndDisk)
-    assert(rddInfos(0).numCachedPartitions === 5)
-    assert(rddInfos(0).memSize === 5L)
-    assert(rddInfos(0).diskSize === 10L)
-    assert(rddInfos(0).externalBlockStoreSize === 0L)
-    assert(rddInfos(1).storageLevel === memAndDisk)
-    assert(rddInfos(1).numCachedPartitions === 3)
-    assert(rddInfos(1).memSize === 3L)
-    assert(rddInfos(1).diskSize === 6L)
-    assert(rddInfos(1).externalBlockStoreSize === 0L)
-  }
-
-  test("StorageUtils.getRddBlockLocations") {
-    val storageStatuses = stockStorageStatuses
-    val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
-    val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
-    assert(blockLocations0.size === 5)
-    assert(blockLocations1.size === 3)
-    assert(blockLocations0.contains(RDDBlockId(0, 0)))
-    assert(blockLocations0.contains(RDDBlockId(0, 1)))
-    assert(blockLocations0.contains(RDDBlockId(0, 2)))
-    assert(blockLocations0.contains(RDDBlockId(0, 3)))
-    assert(blockLocations0.contains(RDDBlockId(0, 4)))
-    assert(blockLocations1.contains(RDDBlockId(1, 0)))
-    assert(blockLocations1.contains(RDDBlockId(1, 1)))
-    assert(blockLocations1.contains(RDDBlockId(1, 2)))
-    assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1"))
-    assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1"))
-    assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2"))
-    assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2"))
-    assert(blockLocations0(RDDBlockId(0, 4)) === Seq("cat:3"))
-    assert(blockLocations1(RDDBlockId(1, 0)) === Seq("duck:2"))
-    assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2"))
-    assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
-  }
-
-  test("StorageUtils.getRddBlockLocations with multiple locations") {
-    val storageStatuses = stockStorageStatuses
-    storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L))
-    storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L))
-    storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
-    val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
-    val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
-    assert(blockLocations0.size === 5)
-    assert(blockLocations1.size === 3)
-    assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1", "cat:3"))
-    assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1"))
-    assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2"))
-    assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2"))
-    assert(blockLocations0(RDDBlockId(0, 4)) === Seq("dog:1", "cat:3"))
-    assert(blockLocations1(RDDBlockId(1, 0)) === Seq("dog:1", "duck:2"))
-    assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2"))
-    assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
-  }
-
   private val offheap = StorageLevel.OFF_HEAP
   // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap
   // and offheap blocks
@@ -373,21 +169,6 @@ class StorageSuite extends SparkFunSuite {
     status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
     assert(status.memUsed === actualMemUsed)
     assert(status.diskUsed === actualDiskUsed)
-
-    status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
-    status.updateBlock(RDDBlockId(0, 0), BlockStatus(offheap, 4L, 0L))
-    status.updateBlock(RDDBlockId(1, 1), BlockStatus(offheap, 4L, 0L))
-    assert(status.memUsed === actualMemUsed)
-    assert(status.diskUsed === actualDiskUsed)
-    assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
-    assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
-
-    status.removeBlock(TestBlockId("fire"))
-    status.removeBlock(TestBlockId("man"))
-    status.removeBlock(RDDBlockId(2, 2))
-    status.removeBlock(RDDBlockId(2, 3))
-    assert(status.memUsed === actualMemUsed)
-    assert(status.diskUsed === actualDiskUsed)
   }
 
   private def storageStatus4: StorageStatus = {

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d35c50e..381f7b5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,20 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+    // [SPARK-20659] Remove StorageStatus, or make it private
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.totalOffHeapStorageMemory"),
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.usedOffHeapStorageMemory"),
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.usedOnHeapStorageMemory"),
+    ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkExecutorInfo.totalOnHeapStorageMemory"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.getExecutorStorageStatus"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.numBlocks"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.numRddBlocks"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.containsBlock"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.rddBlocksById"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.numRddBlocksById"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.memUsedByRdd"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.cacheSize"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.rddStorageLevel")
   )
 
   // Exclude rules for 2.3.x

http://git-wip-us.apache.org/repos/asf/spark/blob/116c581d/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
index ec3d790..d49e0fd 100644
--- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
@@ -350,7 +350,7 @@ class SingletonReplSuite extends SparkFunSuite {
       """
         |val timeout = 60000 // 60 seconds
         |val start = System.currentTimeMillis
-        |while(sc.getExecutorStorageStatus.size != 3 &&
+        |while(sc.statusTracker.getExecutorInfos.size != 3 &&
         |    (System.currentTimeMillis - start) < timeout) {
         |  Thread.sleep(10)
         |}
@@ -361,11 +361,11 @@ class SingletonReplSuite extends SparkFunSuite {
         |case class Foo(i: Int)
         |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2)
         |ret.count()
-        |val res = sc.getExecutorStorageStatus.map(s => s.rddBlocksById(ret.id).size).sum
+        |val res = sc.getRDDStorageInfo.filter(_.id == ret.id).map(_.numCachedPartitions).sum
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
-    assertContains("res: Int = 20", output)
+    assertContains("res: Int = 10", output)
   }
 
   test("should clone and clean line object in ClosureCleaner") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org