You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/11/08 00:14:31 UTC

spark git commit: [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer

Repository: spark
Updated Branches:
  refs/heads/master 19cf20806 -> 3a710b94b


[SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer

## What changes were proposed in this pull request?

When profiling heap dumps from the HistoryServer and live Spark web UIs, I found a large amount of memory being wasted on duplicated objects and strings. This patch's changes remove most of this duplication, resulting in over 40% memory savings for some benchmarks.

- **Task metrics** (6441f0624dfcda9c7193a64bfb416a145b5aabdf): previously, every `TaskUIData` object would have its own instances of `InputMetricsUIData`, `OutputMetricsUIData`, `ShuffleReadMetrics`, and `ShuffleWriteMetrics`, but for many tasks these metrics are irrelevant because they're all zero. This patch changes how we construct these metrics in order to re-use a single immutable "empty" value for the cases where these metrics are empty.
- **TaskInfo.accumulables** (ade86db901127bf13c0e0bdc3f09c933a093bb76): Previously, every `TaskInfo` object had its own empty `ListBuffer` for holding updates from named accumulators. Tasks which didn't use named accumulators still paid for the cost of allocating and storing this empty buffer. To avoid this overhead, I changed the `val` with a mutable buffer into a `var` which holds an immutable Scala list, allowing tasks which do not have named accumulator updates to share the same singleton `Nil` object.
- **String.intern() in JSONProtocol** (7e05630e9a78c455db8c8c499f0590c864624e05): in the HistoryServer, executor hostnames and ids are deserialized from JSON, leading to massive duplication of these string objects. By calling `String.intern()` on the deserialized values we can remove all of this duplication. Since Spark now requires Java 7+ we don't have to worry about string interning exhausting the permgen (see http://java-performance.info/string-intern-in-java-6-7-8/).

## How was this patch tested?

I ran

```
sc.parallelize(1 to 100000, 100000).count()
```

in `spark-shell` with event logging enabled, then loaded that event log in the HistoryServer, performed a full GC, and took a heap dump. According to YourKit, the changes in this patch reduced memory consumption by roughly 28 megabytes (or 770k Java objects):

![image](https://cloud.githubusercontent.com/assets/50748/19953276/4f3a28aa-a129-11e6-93df-d7fa91396f66.png)

Here's a table illustrating the drop in objects due to deduplication (the drop is <100k for some objects because some events were dropped from the listener bus; this is a separate, existing bug that I'll address separately after CPU-profiling):

![image](https://cloud.githubusercontent.com/assets/50748/19953290/6a271290-a129-11e6-93ad-b825f1448886.png)

Author: Josh Rosen <jo...@databricks.com>

Closes #15743 from JoshRosen/spark-ui-memory-usage.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a710b94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a710b94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a710b94

Branch: refs/heads/master
Commit: 3a710b94b0c853a2dd4c40dca446ecde4e7be959
Parents: 19cf208
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Nov 7 16:14:19 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Nov 7 16:14:19 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  3 +-
 .../org/apache/spark/scheduler/TaskInfo.scala   | 10 ++-
 .../scala/org/apache/spark/ui/jobs/UIData.scala | 83 +++++++++++++++-----
 .../org/apache/spark/util/JsonProtocol.scala    | 10 +--
 .../ui/jobs/JobProgressListenerSuite.scala      |  2 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  7 +-
 project/MimaExcludes.scala                      |  5 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  2 +-
 8 files changed, 84 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f251740..7fde34d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1089,7 +1089,8 @@ class DAGScheduler(
         // To avoid UI cruft, ignore cases where value wasn't updated
         if (acc.name.isDefined && !updates.isZero) {
           stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
-          event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
+          event.taskInfo.setAccumulables(
+            acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
         }
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index eeb7963..5968013 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable.ListBuffer
-
 import org.apache.spark.TaskState
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.annotation.DeveloperApi
@@ -54,7 +52,13 @@ class TaskInfo(
    * accumulable to be updated multiple times in a single task or for two accumulables with the
    * same name but different IDs to exist in a task.
    */
-  val accumulables = ListBuffer[AccumulableInfo]()
+  def accumulables: Seq[AccumulableInfo] = _accumulables
+
+  private[this] var _accumulables: Seq[AccumulableInfo] = Nil
+
+  private[spark] def setAccumulables(newAccumulables: Seq[AccumulableInfo]): Unit = {
+    _accumulables = newAccumulables
+  }
 
   /**
    * The time when the task has completed successfully (including the time to remotely fetch

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index f4a0460..9ce8542 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
 import scala.collection.mutable.{HashMap, LinkedHashMap}
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor._
 import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
 import org.apache.spark.util.AccumulatorContext
 import org.apache.spark.util.collection.OpenHashSet
@@ -147,9 +147,8 @@ private[spark] object UIData {
           memoryBytesSpilled = m.memoryBytesSpilled,
           diskBytesSpilled = m.diskBytesSpilled,
           peakExecutionMemory = m.peakExecutionMemory,
-          inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
-          outputMetrics =
-            OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
+          inputMetrics = InputMetricsUIData(m.inputMetrics),
+          outputMetrics = OutputMetricsUIData(m.outputMetrics),
           shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
           shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
       }
@@ -171,9 +170,9 @@ private[spark] object UIData {
         speculative = taskInfo.speculative
       )
       newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
-      newTaskInfo.accumulables ++= taskInfo.accumulables.filter {
+      newTaskInfo.setAccumulables(taskInfo.accumulables.filter {
         accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
-      }
+      })
       newTaskInfo.finishTime = taskInfo.finishTime
       newTaskInfo.failed = taskInfo.failed
       newTaskInfo
@@ -197,8 +196,32 @@ private[spark] object UIData {
       shuffleWriteMetrics: ShuffleWriteMetricsUIData)
 
   case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
+  object InputMetricsUIData {
+    def apply(metrics: InputMetrics): InputMetricsUIData = {
+      if (metrics.bytesRead == 0 && metrics.recordsRead == 0) {
+        EMPTY
+      } else {
+        new InputMetricsUIData(
+          bytesRead = metrics.bytesRead,
+          recordsRead = metrics.recordsRead)
+      }
+    }
+    private val EMPTY = InputMetricsUIData(0, 0)
+  }
 
   case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
+  object OutputMetricsUIData {
+    def apply(metrics: OutputMetrics): OutputMetricsUIData = {
+      if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
+        EMPTY
+      } else {
+        new OutputMetricsUIData(
+          bytesWritten = metrics.bytesWritten,
+          recordsWritten = metrics.recordsWritten)
+      }
+    }
+    private val EMPTY = OutputMetricsUIData(0, 0)
+  }
 
   case class ShuffleReadMetricsUIData(
       remoteBlocksFetched: Long,
@@ -212,17 +235,30 @@ private[spark] object UIData {
 
   object ShuffleReadMetricsUIData {
     def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
-      new ShuffleReadMetricsUIData(
-        remoteBlocksFetched = metrics.remoteBlocksFetched,
-        localBlocksFetched = metrics.localBlocksFetched,
-        remoteBytesRead = metrics.remoteBytesRead,
-        localBytesRead = metrics.localBytesRead,
-        fetchWaitTime = metrics.fetchWaitTime,
-        recordsRead = metrics.recordsRead,
-        totalBytesRead = metrics.totalBytesRead,
-        totalBlocksFetched = metrics.totalBlocksFetched
-      )
+      if (
+          metrics.remoteBlocksFetched == 0 &&
+          metrics.localBlocksFetched == 0 &&
+          metrics.remoteBytesRead == 0 &&
+          metrics.localBytesRead == 0 &&
+          metrics.fetchWaitTime == 0 &&
+          metrics.recordsRead == 0 &&
+          metrics.totalBytesRead == 0 &&
+          metrics.totalBlocksFetched == 0) {
+        EMPTY
+      } else {
+        new ShuffleReadMetricsUIData(
+          remoteBlocksFetched = metrics.remoteBlocksFetched,
+          localBlocksFetched = metrics.localBlocksFetched,
+          remoteBytesRead = metrics.remoteBytesRead,
+          localBytesRead = metrics.localBytesRead,
+          fetchWaitTime = metrics.fetchWaitTime,
+          recordsRead = metrics.recordsRead,
+          totalBytesRead = metrics.totalBytesRead,
+          totalBlocksFetched = metrics.totalBlocksFetched
+        )
+      }
     }
+    private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0)
   }
 
   case class ShuffleWriteMetricsUIData(
@@ -232,12 +268,17 @@ private[spark] object UIData {
 
   object ShuffleWriteMetricsUIData {
     def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
-      new ShuffleWriteMetricsUIData(
-        bytesWritten = metrics.bytesWritten,
-        recordsWritten = metrics.recordsWritten,
-        writeTime = metrics.writeTime
-      )
+      if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) {
+        EMPTY
+      } else {
+        new ShuffleWriteMetricsUIData(
+          bytesWritten = metrics.bytesWritten,
+          recordsWritten = metrics.recordsWritten,
+          writeTime = metrics.writeTime
+        )
+      }
     }
+    private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6593aab..4b4d2d1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -702,8 +702,8 @@ private[spark] object JsonProtocol {
     val index = (json \ "Index").extract[Int]
     val attempt = Utils.jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
     val launchTime = (json \ "Launch Time").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val host = (json \ "Host").extract[String]
+    val executorId = (json \ "Executor ID").extract[String].intern()
+    val host = (json \ "Host").extract[String].intern()
     val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
     val speculative = Utils.jsonOption(json \ "Speculative").exists(_.extract[Boolean])
     val gettingResultTime = (json \ "Getting Result Time").extract[Long]
@@ -721,7 +721,7 @@ private[spark] object JsonProtocol {
     taskInfo.finishTime = finishTime
     taskInfo.failed = failed
     taskInfo.killed = killed
-    accumulables.foreach { taskInfo.accumulables += _ }
+    taskInfo.setAccumulables(accumulables)
     taskInfo
   }
 
@@ -903,8 +903,8 @@ private[spark] object JsonProtocol {
     if (json == JNothing) {
       return null
     }
-    val executorId = (json \ "Executor ID").extract[String]
-    val host = (json \ "Host").extract[String]
+    val executorId = (json \ "Executor ID").extract[String].intern()
+    val host = (json \ "Host").extract[String].intern()
     val port = (json \ "Port").extract[Int]
     BlockManagerId(executorId, host, port)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 8418fa7..da853f1 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -403,7 +403,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
       internal = false,
       countFailedValues = false,
       metadata = None)
-    taskInfo.accumulables ++= Seq(internalAccum, sqlAccum, userAccum)
+    taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum))
 
     val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
     assert(newTaskInfo.accumulables === Seq(userAccum))

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d5146d7..85da791 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -788,11 +788,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
   private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
     val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
       speculative)
-    val (acc1, acc2, acc3) =
-      (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))
-    taskInfo.accumulables += acc1
-    taskInfo.accumulables += acc2
-    taskInfo.accumulables += acc3
+    taskInfo.setAccumulables(
+      List(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)))
     taskInfo
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 350b144..12f7ed2 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -86,7 +86,10 @@ object MimaExcludes {
       // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness.
       ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"),
       ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"),
-      ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=")
+      ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
+
+      // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3a710b94/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 19b6d26..948a155 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -374,7 +374,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
     val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
     val taskInfo = createTaskInfo(0, 0)
-    taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)
+    taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
     val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
     listener.onOtherEvent(executionStart)
     listener.onJobStart(jobStart)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org