You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/05 18:41:15 UTC

[spark] branch master updated: [SPARK-27012][CORE] Storage tab shows rdd details even after executor ended

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6207360  [SPARK-27012][CORE] Storage tab shows rdd details even after executor ended
6207360 is described below

commit 6207360b00ed5e888ac460e6cf9de7cdd2478bff
Author: Ajith <aj...@gmail.com>
AuthorDate: Tue Mar 5 10:40:38 2019 -0800

    [SPARK-27012][CORE] Storage tab shows rdd details even after executor ended
    
    ## What changes were proposed in this pull request?
    
    After we cache a table, we can see its details in Storage Tab of spark UI. If the executor has shutdown ( graceful shutdown/ Dynamic executor scenario) UI still shows the rdd as cached and when we click the link it throws error. This is because on executor remove event, we fail to adjust rdd partition details  org.apache.spark.status.AppStatusListener#onExecutorRemoved
    
    ## How was this patch tested?
    
    Have tested this fix in UI manually
    Edit: Added UT
    
    Closes #23920 from ajithme/cachestorage.
    
    Authored-by: Ajith <aj...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/status/AppStatusListener.scala    |  24 +++++
 .../spark/status/AppStatusListenerSuite.scala      | 100 +++++++++++++++++++++
 2 files changed, 124 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index a8b2153..fc64bef 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -211,6 +211,30 @@ private[spark] class AppStatusListener(
           update(rdd, now)
         }
       }
+      // Remove all RDD partitions that reference the removed executor
+      liveRDDs.values.foreach { rdd =>
+        rdd.getPartitions.values
+          .filter(_.executors.contains(event.executorId))
+          .foreach { partition =>
+            if (partition.executors.length == 1) {
+              rdd.removePartition(partition.blockName)
+              rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, partition.memoryUsed * -1)
+              rdd.diskUsed = addDeltaToValue(rdd.diskUsed, partition.diskUsed * -1)
+            } else {
+              rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed,
+                (partition.memoryUsed / partition.executors.length) * -1)
+              rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
+                (partition.diskUsed / partition.executors.length) * -1)
+              partition.update(partition.executors
+                .filter(!_.equals(event.executorId)), rdd.storageLevel,
+                addDeltaToValue(partition.memoryUsed,
+                  (partition.memoryUsed / partition.executors.length) * -1),
+                addDeltaToValue(partition.diskUsed,
+                  (partition.diskUsed / partition.executors.length) * -1))
+            }
+          }
+        update(rdd, now)
+      }
       if (isExecutorActiveForLiveStages(exec)) {
         // the executor was running for a currently active stage, so save it for now in
         // deadExecutors, and remove when there are no active stages overlapping with the
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 9f51bd7..b580066 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1520,6 +1520,106 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("storage information on executor lost/down") {
+    val listener = new AppStatusListener(store, conf, true)
+    val maxMemory = 42L
+
+    // Register a couple of block managers.
+    val bm1 = BlockManagerId("1", "1.example.com", 42)
+    val bm2 = BlockManagerId("2", "2.example.com", 84)
+    Seq(bm1, bm2).foreach { bm =>
+      listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
+        new ExecutorInfo(bm.host, 1, Map.empty, Map.empty)))
+      listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory))
+    }
+
+    val rdd1b1 = RddBlock(1, 1, 1L, 2L)
+    val rdd1b2 = RddBlock(1, 2, 3L, 4L)
+    val level = StorageLevel.MEMORY_AND_DISK
+
+    // Submit a stage and make sure the RDDs are recorded.
+    val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
+    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1")
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    // Add partition 1 replicated on two block managers.
+    listener.onBlockUpdated(SparkListenerBlockUpdated(
+      BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))
+
+    listener.onBlockUpdated(SparkListenerBlockUpdated(
+      BlockUpdatedInfo(bm2, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))
+
+    // Add a second partition only to bm 1.
+    listener.onBlockUpdated(SparkListenerBlockUpdated(
+      BlockUpdatedInfo(bm1, rdd1b2.blockId, level, rdd1b2.memSize, rdd1b2.diskSize)))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.numCachedPartitions === 2L)
+      assert(wrapper.info.memoryUsed === 2 * rdd1b1.memSize + rdd1b2.memSize)
+      assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
+      assert(wrapper.info.dataDistribution.get.size === 2L)
+      assert(wrapper.info.partitions.get.size === 2L)
+
+      val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get
+      assert(dist.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
+      assert(dist.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
+      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
+
+      val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
+      assert(part1.storageLevel === level.description)
+      assert(part1.memoryUsed === 2 * rdd1b1.memSize)
+      assert(part1.diskUsed === 2 * rdd1b1.diskSize)
+      assert(part1.executors === Seq(bm1.executorId, bm2.executorId))
+
+      val part2 = wrapper.info.partitions.get.find(_.blockName === rdd1b2.blockId.name).get
+      assert(part2.storageLevel === level.description)
+      assert(part2.memoryUsed === rdd1b2.memSize)
+      assert(part2.diskUsed === rdd1b2.diskSize)
+      assert(part2.executors === Seq(bm1.executorId))
+    }
+
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 2L)
+      assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
+      assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
+    }
+
+    // Remove Executor 1.
+    listener.onExecutorRemoved(createExecutorRemovedEvent(1))
+
+    // check that partition info now contains only details about what is remaining in bm2
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.numCachedPartitions === 1L)
+      assert(wrapper.info.memoryUsed === rdd1b1.memSize)
+      assert(wrapper.info.diskUsed === rdd1b1.diskSize)
+      assert(wrapper.info.dataDistribution.get.size === 1L)
+      assert(wrapper.info.partitions.get.size === 1L)
+
+      val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get
+      assert(dist.memoryUsed === rdd1b1.memSize)
+      assert(dist.diskUsed === rdd1b1.diskSize)
+      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
+
+      val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
+      assert(part.storageLevel === level.description)
+      assert(part.memoryUsed === rdd1b1.memSize)
+      assert(part.diskUsed === rdd1b1.diskSize)
+      assert(part.executors === Seq(bm2.executorId))
+    }
+
+    // Remove Executor 2.
+    listener.onExecutorRemoved(createExecutorRemovedEvent(2))
+    // Check that storage cost is zero as both exec are down
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.numCachedPartitions === 0)
+      assert(wrapper.info.memoryUsed === 0)
+      assert(wrapper.info.diskUsed === 0)
+      assert(wrapper.info.dataDistribution.isEmpty)
+      assert(wrapper.info.partitions.get.isEmpty)
+    }
+  }
+
+
   private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
 
   private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org