You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/09/11 17:31:12 UTC

spark git commit: [SPARK-24889][CORE] Update block info when unpersist rdds

Repository: spark
Updated Branches:
  refs/heads/master bcb9a8c83 -> 14f3ad209


[SPARK-24889][CORE] Update block info when unpersist rdds

## What changes were proposed in this pull request?

We will update block info coming from executors, at the timing like caching a RDD. However, when removing RDDs with unpersisting, we don't ask to update block info. So the block info is not updated.

We can fix this with few options:

1. Ask to update block info when unpersisting

This is simplest but changes driver-executor communication a bit.

2. Update block info when processing the event of unpersisting RDD

We send a `SparkListenerUnpersistRDD` event when unpersisting RDD. When processing this event, we can update block info of the RDD. This only changes event processing code so the risk seems to be lower.

Currently this patch takes option 2 for lower risk. If we agree first option has no risk, we can change to it.

## How was this patch tested?

Unit tests.

Closes #22341 from viirya/SPARK-24889.

Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14f3ad20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14f3ad20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14f3ad20

Branch: refs/heads/master
Commit: 14f3ad20932535fe952428bf255e7eddd8fa1b58
Parents: bcb9a8c
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Tue Sep 11 10:31:06 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Sep 11 10:31:06 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala | 64 +++++++++++++++-----
 .../org/apache/spark/status/LiveEntity.scala    |  4 ++
 .../org/apache/spark/storage/RDDInfo.scala      |  2 +-
 .../spark/status/AppStatusListenerSuite.scala   | 29 +++++++++
 4 files changed, 82 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14f3ad20/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 304d092..f21eee1 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -686,7 +686,37 @@ private[spark] class AppStatusListener(
   }
 
   override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
-    liveRDDs.remove(event.rddId)
+    liveRDDs.remove(event.rddId).foreach { liveRDD =>
+      val storageLevel = liveRDD.info.storageLevel
+
+      // Use RDD partition info to update executor block info.
+      liveRDD.getPartitions().foreach { case (_, part) =>
+        part.executors.foreach { executorId =>
+          liveExecutors.get(executorId).foreach { exec =>
+            exec.rddBlocks = exec.rddBlocks - 1
+          }
+        }
+      }
+
+      val now = System.nanoTime()
+
+      // Use RDD distribution to update executor memory and disk usage info.
+      liveRDD.getDistributions().foreach { case (executorId, rddDist) =>
+        liveExecutors.get(executorId).foreach { exec =>
+          if (exec.hasMemoryInfo) {
+            if (storageLevel.useOffHeap) {
+              exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed)
+            } else {
+              exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed)
+            }
+          }
+          exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed)
+          exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed)
+          maybeUpdate(exec, now)
+        }
+      }
+    }
+
     kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
   }
 
@@ -770,6 +800,11 @@ private[spark] class AppStatusListener(
       .sortBy(_.stageId)
   }
 
+  /**
+   * Apply a delta to a value, but ensure that it doesn't go negative.
+   */
+  private def addDeltaToValue(old: Long, delta: Long): Long = math.max(0, old + delta)
+
   private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
     val now = System.nanoTime()
     val executorId = event.blockUpdatedInfo.blockManagerId.executorId
@@ -779,9 +814,6 @@ private[spark] class AppStatusListener(
     val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
     val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
 
-    // Function to apply a delta to a value, but ensure that it doesn't go negative.
-    def newValue(old: Long, delta: Long): Long = math.max(0, old + delta)
-
     val updatedStorageLevel = if (storageLevel.isValid) {
       Some(storageLevel.description)
     } else {
@@ -798,13 +830,13 @@ private[spark] class AppStatusListener(
     maybeExec.foreach { exec =>
       if (exec.hasMemoryInfo) {
         if (storageLevel.useOffHeap) {
-          exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta)
+          exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, memoryDelta)
         } else {
-          exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta)
+          exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, memoryDelta)
         }
       }
-      exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta)
-      exec.diskUsed = newValue(exec.diskUsed, diskDelta)
+      exec.memoryUsed = addDeltaToValue(exec.memoryUsed, memoryDelta)
+      exec.diskUsed = addDeltaToValue(exec.diskUsed, diskDelta)
     }
 
     // Update the block entry in the RDD info, keeping track of the deltas above so that we
@@ -832,8 +864,8 @@ private[spark] class AppStatusListener(
       // Only update the partition if it's still stored in some executor, otherwise get rid of it.
       if (executors.nonEmpty) {
         partition.update(executors, rdd.storageLevel,
-          newValue(partition.memoryUsed, memoryDelta),
-          newValue(partition.diskUsed, diskDelta))
+          addDeltaToValue(partition.memoryUsed, memoryDelta),
+          addDeltaToValue(partition.diskUsed, diskDelta))
       } else {
         rdd.removePartition(block.name)
       }
@@ -841,14 +873,14 @@ private[spark] class AppStatusListener(
       maybeExec.foreach { exec =>
         if (exec.rddBlocks + rddBlocksDelta > 0) {
           val dist = rdd.distribution(exec)
-          dist.memoryUsed = newValue(dist.memoryUsed, memoryDelta)
-          dist.diskUsed = newValue(dist.diskUsed, diskDelta)
+          dist.memoryUsed = addDeltaToValue(dist.memoryUsed, memoryDelta)
+          dist.diskUsed = addDeltaToValue(dist.diskUsed, diskDelta)
 
           if (exec.hasMemoryInfo) {
             if (storageLevel.useOffHeap) {
-              dist.offHeapUsed = newValue(dist.offHeapUsed, memoryDelta)
+              dist.offHeapUsed = addDeltaToValue(dist.offHeapUsed, memoryDelta)
             } else {
-              dist.onHeapUsed = newValue(dist.onHeapUsed, memoryDelta)
+              dist.onHeapUsed = addDeltaToValue(dist.onHeapUsed, memoryDelta)
             }
           }
           dist.lastUpdate = null
@@ -867,8 +899,8 @@ private[spark] class AppStatusListener(
         }
       }
 
-      rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta)
-      rdd.diskUsed = newValue(rdd.diskUsed, diskDelta)
+      rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, memoryDelta)
+      rdd.diskUsed = addDeltaToValue(rdd.diskUsed, diskDelta)
       update(rdd, now)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14f3ad20/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index a0b2458..762aed4 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -541,6 +541,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
     distributions.get(exec.executorId)
   }
 
+  def getPartitions(): scala.collection.Map[String, LiveRDDPartition] = partitions
+
+  def getDistributions(): scala.collection.Map[String, LiveRDDDistribution] = distributions
+
   override protected def doUpdate(): Any = {
     val dists = if (distributions.nonEmpty) {
       Some(distributions.values.map(_.toApi()).toSeq)

http://git-wip-us.apache.org/repos/asf/spark/blob/14f3ad20/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 9ccc8f9..64e5c8b 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -55,7 +55,7 @@ class RDDInfo(
 }
 
 private[spark] object RDDInfo {
-  private val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
+  private lazy val callsiteForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_FORM)
 
   def fromRdd(rdd: RDD[_]): RDDInfo = {
     val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))

http://git-wip-us.apache.org/repos/asf/spark/blob/14f3ad20/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index d0c2dc4..0b2bbd2 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -882,12 +882,41 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
       assert(dist.memoryRemaining === maxMemory - rdd2b1.memSize - rdd1b2.memSize )
     }
 
+    // Add block1 of rdd1 back to bm 1.
+    listener.onBlockUpdated(SparkListenerBlockUpdated(
+      BlockUpdatedInfo(bm1, rdd1b1.blockId, level, rdd1b1.memSize, rdd1b1.diskSize)))
+
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 3L)
+      assert(exec.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize + rdd2b1.memSize)
+      assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize + rdd2b1.diskSize)
+    }
+
     // Unpersist RDD1.
     listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId))
     intercept[NoSuchElementException] {
       check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () }
     }
 
+    // executor1 now only contains block1 from rdd2.
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 1L)
+      assert(exec.info.memoryUsed === rdd2b1.memSize)
+      assert(exec.info.diskUsed === rdd2b1.diskSize)
+    }
+
+    // Unpersist RDD2.
+    listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd2b1.rddId))
+    intercept[NoSuchElementException] {
+      check[RDDStorageInfoWrapper](rdd2b1.rddId) { _ => () }
+    }
+
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 0L)
+      assert(exec.info.memoryUsed === 0)
+      assert(exec.info.diskUsed === 0)
+    }
+
     // Update a StreamBlock.
     val stream1 = StreamBlockId(1, 1L)
     listener.onBlockUpdated(SparkListenerBlockUpdated(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org