You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/04/06 18:24:06 UTC

[2/2] spark git commit: [SPARK-17019][CORE] Expose on-heap and off-heap memory usage in various places

[SPARK-17019][CORE] Expose on-heap and off-heap memory usage in various places

## What changes were proposed in this pull request?

With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992), Spark supports persisting data into off-heap memory, but the usage of on-heap and off-heap memory is not exposed currently, it is not so convenient for user to monitor and profile, so here propose to expose off-heap memory as well as on-heap memory usage in various places:
1. Spark UI's executor page will display both on-heap and off-heap memory usage.
2. REST request returns both on-heap and off-heap memory.
3. Also this can be gotten from MetricsSystem.
4. Last this usage can be obtained programmatically from SparkListener.

Attach the UI changes:

![screen shot 2016-08-12 at 11 20 44 am](https://cloud.githubusercontent.com/assets/850797/17612032/6c2f4480-607f-11e6-82e8-a27fb8cbb4ae.png)

Backward compatibility is also considered for event-log and REST API. Old event log can still be replayed with off-heap usage displayed as 0. For REST API, only adds the new fields, so JSON backward compatibility can still be kept.
## How was this patch tested?

Unit test added and manual verification.

Author: jerryshao <ss...@hortonworks.com>

Closes #14617 from jerryshao/SPARK-17019.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4491626
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4491626
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4491626

Branch: refs/heads/master
Commit: a4491626ed8169f0162a0dfb78736c9b9e7fb434
Parents: 5a693b4
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Apr 6 13:23:54 2017 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Thu Apr 6 13:23:54 2017 -0500

----------------------------------------------------------------------
 .../spark/ui/static/executorspage-template.html |  18 ++-
 .../org/apache/spark/ui/static/executorspage.js | 103 +++++++++++++-
 .../org/apache/spark/ui/static/webui.css        |   3 +-
 .../apache/spark/scheduler/SparkListener.scala  |   9 +-
 .../spark/status/api/v1/AllRDDResource.scala    |   8 +-
 .../org/apache/spark/status/api/v1/api.scala    |  12 +-
 .../org/apache/spark/storage/BlockManager.scala |   9 +-
 .../spark/storage/BlockManagerMaster.scala      |   5 +-
 .../storage/BlockManagerMasterEndpoint.scala    |  22 +--
 .../spark/storage/BlockManagerMessages.scala    |   3 +-
 .../spark/storage/BlockManagerSource.scala      |  66 ++++-----
 .../spark/storage/StorageStatusListener.scala   |   8 +-
 .../org/apache/spark/storage/StorageUtils.scala |  99 ++++++++-----
 .../apache/spark/ui/exec/ExecutorsPage.scala    |  46 +++++-
 .../org/apache/spark/ui/storage/RDDPage.scala   |  11 +-
 .../org/apache/spark/util/JsonProtocol.scala    |   8 +-
 .../executor_memory_usage_expectation.json      | 139 +++++++++++++++++++
 .../executor_node_blacklisting_expectation.json |  41 ++++--
 .../spark-events/app-20161116163331-0000        |  10 +-
 .../deploy/history/HistoryServerSuite.scala     |   3 +-
 .../org/apache/spark/storage/StorageSuite.scala |  87 +++++++++++-
 .../org/apache/spark/ui/UISeleniumSuite.scala   |  36 ++++-
 project/MimaExcludes.scala                      |  11 +-
 23 files changed, 638 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 4e83d6d..5c91304 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -24,7 +24,15 @@ limitations under the License.
         <th></th>
         <th>RDD Blocks</th>
         <th><span data-toggle="tooltip"
-                  title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
+                  title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
+        </th>
+        <th class="on_heap_memory">
+          <span data-toggle="tooltip"
+                title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
+        </th>
+        <th class="off_heap_memory">
+          <span data-toggle="tooltip"
+                title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
         </th>
         <th>Disk Used</th>
         <th>Cores</th>
@@ -73,6 +81,14 @@ limitations under the License.
             <span data-toggle="tooltip" data-placement="top"
                   title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
               Storage Memory</span></th>
+          <th class="on_heap_memory">
+            <span data-toggle="tooltip" data-placement="top"
+                  title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
+              On Heap Storage Memory</span></th>
+          <th class="off_heap_memory">
+            <span data-toggle="tooltip"
+                  title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
+              Off Heap Storage Memory</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index 7dbfe32..930a069 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -190,6 +190,10 @@ $(document).ready(function () {
             var allRDDBlocks = 0;
             var allMemoryUsed = 0;
             var allMaxMemory = 0;
+            var allOnHeapMemoryUsed = 0;
+            var allOnHeapMaxMemory = 0;
+            var allOffHeapMemoryUsed = 0;
+            var allOffHeapMaxMemory = 0;
             var allDiskUsed = 0;
             var allTotalCores = 0;
             var allMaxTasks = 0;
@@ -208,6 +212,10 @@ $(document).ready(function () {
             var activeRDDBlocks = 0;
             var activeMemoryUsed = 0;
             var activeMaxMemory = 0;
+            var activeOnHeapMemoryUsed = 0;
+            var activeOnHeapMaxMemory = 0;
+            var activeOffHeapMemoryUsed = 0;
+            var activeOffHeapMaxMemory = 0;
             var activeDiskUsed = 0;
             var activeTotalCores = 0;
             var activeMaxTasks = 0;
@@ -226,6 +234,10 @@ $(document).ready(function () {
             var deadRDDBlocks = 0;
             var deadMemoryUsed = 0;
             var deadMaxMemory = 0;
+            var deadOnHeapMemoryUsed = 0;
+            var deadOnHeapMaxMemory = 0;
+            var deadOffHeapMemoryUsed = 0;
+            var deadOffHeapMaxMemory = 0;
             var deadDiskUsed = 0;
             var deadTotalCores = 0;
             var deadMaxTasks = 0;
@@ -241,10 +253,21 @@ $(document).ready(function () {
             var deadTotalBlacklisted = 0;
 
             response.forEach(function (exec) {
+                exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
+                exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
+                exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
+                exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
+            });
+
+            response.forEach(function (exec) {
                 allExecCnt += 1;
                 allRDDBlocks += exec.rddBlocks;
                 allMemoryUsed += exec.memoryUsed;
                 allMaxMemory += exec.maxMemory;
+                allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+                allOnHeapMaxMemory += exec.maxOnHeapMemory;
+                allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+                allOffHeapMaxMemory += exec.maxOffHeapMemory;
                 allDiskUsed += exec.diskUsed;
                 allTotalCores += exec.totalCores;
                 allMaxTasks += exec.maxTasks;
@@ -263,6 +286,10 @@ $(document).ready(function () {
                     activeRDDBlocks += exec.rddBlocks;
                     activeMemoryUsed += exec.memoryUsed;
                     activeMaxMemory += exec.maxMemory;
+                    activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+                    activeOnHeapMaxMemory += exec.maxOnHeapMemory;
+                    activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+                    activeOffHeapMaxMemory += exec.maxOffHeapMemory;
                     activeDiskUsed += exec.diskUsed;
                     activeTotalCores += exec.totalCores;
                     activeMaxTasks += exec.maxTasks;
@@ -281,6 +308,10 @@ $(document).ready(function () {
                     deadRDDBlocks += exec.rddBlocks;
                     deadMemoryUsed += exec.memoryUsed;
                     deadMaxMemory += exec.maxMemory;
+                    deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+                    deadOnHeapMaxMemory += exec.maxOnHeapMemory;
+                    deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+                    deadOffHeapMaxMemory += exec.maxOffHeapMemory;
                     deadDiskUsed += exec.diskUsed;
                     deadTotalCores += exec.totalCores;
                     deadMaxTasks += exec.maxTasks;
@@ -302,6 +333,10 @@ $(document).ready(function () {
                 "allRDDBlocks": allRDDBlocks,
                 "allMemoryUsed": allMemoryUsed,
                 "allMaxMemory": allMaxMemory,
+                "allOnHeapMemoryUsed": allOnHeapMemoryUsed,
+                "allOnHeapMaxMemory": allOnHeapMaxMemory,
+                "allOffHeapMemoryUsed": allOffHeapMemoryUsed,
+                "allOffHeapMaxMemory": allOffHeapMaxMemory,
                 "allDiskUsed": allDiskUsed,
                 "allTotalCores": allTotalCores,
                 "allMaxTasks": allMaxTasks,
@@ -321,6 +356,10 @@ $(document).ready(function () {
                 "allRDDBlocks": activeRDDBlocks,
                 "allMemoryUsed": activeMemoryUsed,
                 "allMaxMemory": activeMaxMemory,
+                "allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
+                "allOnHeapMaxMemory": activeOnHeapMaxMemory,
+                "allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
+                "allOffHeapMaxMemory": activeOffHeapMaxMemory,
                 "allDiskUsed": activeDiskUsed,
                 "allTotalCores": activeTotalCores,
                 "allMaxTasks": activeMaxTasks,
@@ -340,6 +379,10 @@ $(document).ready(function () {
                 "allRDDBlocks": deadRDDBlocks,
                 "allMemoryUsed": deadMemoryUsed,
                 "allMaxMemory": deadMaxMemory,
+                "allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
+                "allOnHeapMaxMemory": deadOnHeapMaxMemory,
+                "allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
+                "allOffHeapMaxMemory": deadOffHeapMaxMemory,
                 "allDiskUsed": deadDiskUsed,
                 "allTotalCores": deadTotalCores,
                 "allMaxTasks": deadMaxTasks,
@@ -378,7 +421,35 @@ $(document).ready(function () {
                         {data: 'rddBlocks'},
                         {
                             data: function (row, type) {
-                                return type === 'display' ? (formatBytes(row.memoryUsed, type) + ' / ' + formatBytes(row.maxMemory, type)) : row.memoryUsed;
+                                if (type !== 'display')
+                                    return row.memoryUsed;
+                                else
+                                    return (formatBytes(row.memoryUsed, type) + ' / ' +
+                                        formatBytes(row.maxMemory, type));
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.onHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.onHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.maxOnHeapMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('on_heap_memory')
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.offHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.offHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.maxOffHeapMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('off_heap_memory')
                             }
                         },
                         {data: 'diskUsed', render: formatBytes},
@@ -450,7 +521,35 @@ $(document).ready(function () {
                         {data: 'allRDDBlocks'},
                         {
                             data: function (row, type) {
-                                return type === 'display' ? (formatBytes(row.allMemoryUsed, type) + ' / ' + formatBytes(row.allMaxMemory, type)) : row.allMemoryUsed;
+                                if (type !== 'display')
+                                    return row.allMemoryUsed
+                                else
+                                    return (formatBytes(row.allMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.allMaxMemory, type));
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.allOnHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.allOnHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.allOnHeapMaxMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('on_heap_memory')
+                            }
+                        },
+                        {
+                            data: function (row, type) {
+                                if (type !== 'display')
+                                    return row.allOffHeapMemoryUsed;
+                                else
+                                    return (formatBytes(row.allOffHeapMemoryUsed, type) + ' / ' +
+                                        formatBytes(row.allOffHeapMaxMemory, type));
+                            },
+                            "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+                                $(nTd).addClass('off_heap_memory')
                             }
                         },
                         {data: 'allDiskUsed', render: formatBytes},

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/resources/org/apache/spark/ui/static/webui.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 319a719..935d9b1 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -205,7 +205,8 @@ span.additional-metric-title {
 /* Hide all additional metrics by default. This is done here rather than using JavaScript to
  * avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
 .scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
-.serialization_time, .getting_result_time, .peak_execution_memory {
+.serialization_time, .getting_result_time, .peak_execution_memory,
+.on_heap_memory, .off_heap_memory {
   display: none;
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 4331add..bc2e530 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
-  extends SparkListenerEvent
+case class SparkListenerBlockManagerAdded(
+    time: Long,
+    blockManagerId: BlockManagerId,
+    maxMem: Long,
+    maxOnHeapMem: Option[Long] = None,
+    maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
+}
 
 @DeveloperApi
 case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 5c03609..1279b28 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -70,7 +70,13 @@ private[spark] object AllRDDResource {
           address = status.blockManagerId.hostPort,
           memoryUsed = status.memUsedByRdd(rddId),
           memoryRemaining = status.memRemaining,
-          diskUsed = status.diskUsedByRdd(rddId)
+          diskUsed = status.diskUsedByRdd(rddId),
+          onHeapMemoryUsed = Some(
+            if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+          offHeapMemoryUsed = Some(
+            if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+          onHeapMemoryRemaining = status.onHeapMemRemaining,
+          offHeapMemoryRemaining = status.offHeapMemRemaining
         ) } )
     } else {
       None

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5b92273..d159b94 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -75,7 +75,11 @@ class ExecutorSummary private[spark](
     val totalShuffleWrite: Long,
     val isBlacklisted: Boolean,
     val maxMemory: Long,
-    val executorLogs: Map[String, String])
+    val executorLogs: Map[String, String],
+    val onHeapMemoryUsed: Option[Long],
+    val offHeapMemoryUsed: Option[Long],
+    val maxOnHeapMemory: Option[Long],
+    val maxOffHeapMemory: Option[Long])
 
 class JobData private[spark](
     val jobId: Int,
@@ -111,7 +115,11 @@ class RDDDataDistribution private[spark](
     val address: String,
     val memoryUsed: Long,
     val memoryRemaining: Long,
-    val diskUsed: Long)
+    val diskUsed: Long,
+    val onHeapMemoryUsed: Option[Long],
+    val offHeapMemoryUsed: Option[Long],
+    val onHeapMemoryRemaining: Option[Long],
+    val offHeapMemoryRemaining: Option[Long])
 
 class RDDPartitionInfo private[spark](
     val blockName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 46a078b..63acba6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -150,8 +150,8 @@ private[spark] class BlockManager(
   // However, since we use this only for reporting and logging, what we actually want here is
   // the absolute maximum value that `maxMemory` can ever possibly reach. We may need
   // to revisit whether reporting this value as the "max" is intuitive to the user.
-  private val maxMemory =
-    memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
+  private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
+  private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
 
   // Port used by the external shuffle service. In Yarn mode, this may be already be
   // set through the Hadoop configuration as the server is launched in the Yarn NM.
@@ -229,7 +229,8 @@ private[spark] class BlockManager(
 
     val idFromMaster = master.registerBlockManager(
       id,
-      maxMemory,
+      maxOnHeapMemory,
+      maxOffHeapMemory,
       slaveEndpoint)
 
     blockManagerId = if (idFromMaster != null) idFromMaster else id
@@ -307,7 +308,7 @@ private[spark] class BlockManager(
   def reregister(): Unit = {
     // TODO: We might need to rate limit re-registering.
     logInfo(s"BlockManager $blockManagerId re-registering with master")
-    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
+    master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
     reportAllBlocks()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 3ca690d..ea5d842 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -57,11 +57,12 @@ class BlockManagerMaster(
    */
   def registerBlockManager(
       blockManagerId: BlockManagerId,
-      maxMemSize: Long,
+      maxOnHeapMemSize: Long,
+      maxOffHeapMemSize: Long,
       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
     logInfo(s"Registering BlockManager $blockManagerId")
     val updatedId = driverEndpoint.askSync[BlockManagerId](
-      RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
+      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
     logInfo(s"Registered BlockManager $updatedId")
     updatedId
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 84c04d2..467c3e0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint(
   logInfo("BlockManagerMasterEndpoint up")
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
-      context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
+    case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
+      context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
 
     case _updateBlockInfo @
         UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(
 
   private def storageStatus: Array[StorageStatus] = {
     blockManagerInfo.map { case (blockManagerId, info) =>
-      new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
+      new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
+        Some(info.maxOffHeapMem), info.blocks.asScala)
     }.toArray
   }
 
@@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint(
    */
   private def register(
       idWithoutTopologyInfo: BlockManagerId,
-      maxMemSize: Long,
+      maxOnHeapMemSize: Long,
+      maxOffHeapMemSize: Long,
       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
     // the dummy id is not expected to contain the topology information.
     // we get that info here and respond back with a more fleshed out block manager id
@@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint(
         case None =>
       }
       logInfo("Registering block manager %s with %s RAM, %s".format(
-        id.hostPort, Utils.bytesToString(maxMemSize), id))
+        id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
 
       blockManagerIdByExecutor(id.executorId) = id
 
       blockManagerInfo(id) = new BlockManagerInfo(
-        id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
+        id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
+    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
+        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
     id
   }
 
@@ -464,10 +467,13 @@ object BlockStatus {
 private[spark] class BlockManagerInfo(
     val blockManagerId: BlockManagerId,
     timeMs: Long,
-    val maxMem: Long,
+    val maxOnHeapMem: Long,
+    val maxOffHeapMem: Long,
     val slaveEndpoint: RpcEndpointRef)
   extends Logging {
 
+  val maxMem = maxOnHeapMem + maxOffHeapMem
+
   private var _lastSeenMs: Long = timeMs
   private var _remainingMem: Long = maxMem
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 0aea438..0c0ff14 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -58,7 +58,8 @@ private[spark] object BlockManagerMessages {
 
   case class RegisterBlockManager(
       blockManagerId: BlockManagerId,
-      maxMemSize: Long,
+      maxOnHeapMemSize: Long,
+      maxOffHeapMemSize: Long,
       sender: RpcEndpointRef)
     extends ToBlockManagerMaster
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index c5ba9af..197a017 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -26,35 +26,39 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager)
   override val metricRegistry = new MetricRegistry()
   override val sourceName = "BlockManager"
 
-  metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).sum
-      maxMem / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val remainingMem = storageStatusList.map(_.memRemaining).sum
-      remainingMem / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val memUsed = storageStatusList.map(_.memUsed).sum
-      memUsed / 1024 / 1024
-    }
-  })
-
-  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
-      diskSpaceUsed / 1024 / 1024
-    }
-  })
+  private def registerGauge(name: String, func: BlockManagerMaster => Long): Unit = {
+    metricRegistry.register(name, new Gauge[Long] {
+      override def getValue: Long = func(blockManager.master) / 1024 / 1024
+    })
+  }
+
+  registerGauge(MetricRegistry.name("memory", "maxMem_MB"),
+    _.getStorageStatus.map(_.maxMem).sum)
+
+  registerGauge(MetricRegistry.name("memory", "maxOnHeapMem_MB"),
+    _.getStorageStatus.map(_.maxOnHeapMem.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "maxOffHeapMem_MB"),
+    _.getStorageStatus.map(_.maxOffHeapMem.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "remainingMem_MB"),
+    _.getStorageStatus.map(_.memRemaining).sum)
+
+  registerGauge(MetricRegistry.name("memory", "remainingOnHeapMem_MB"),
+    _.getStorageStatus.map(_.onHeapMemRemaining.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "remainingOffHeapMem_MB"),
+    _.getStorageStatus.map(_.offHeapMemRemaining.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "memUsed_MB"),
+    _.getStorageStatus.map(_.memUsed).sum)
+
+  registerGauge(MetricRegistry.name("memory", "onHeapMemUsed_MB"),
+    _.getStorageStatus.map(_.onHeapMemUsed.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("memory", "offHeapMemUsed_MB"),
+    _.getStorageStatus.map(_.offHeapMemUsed.getOrElse(0L)).sum)
+
+  registerGauge(MetricRegistry.name("disk", "diskSpaceUsed_MB"),
+    _.getStorageStatus.map(_.diskUsed).sum)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 798658a..1b30d4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
   }
 
   def deadStorageStatusList: Seq[StorageStatus] = synchronized {
-    deadExecutorStorageStatus.toSeq
+    deadExecutorStorageStatus
   }
 
   /** Update storage status list to reflect updated block statuses */
@@ -74,8 +74,10 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
     synchronized {
       val blockManagerId = blockManagerAdded.blockManagerId
       val executorId = blockManagerId.executorId
-      val maxMem = blockManagerAdded.maxMem
-      val storageStatus = new StorageStatus(blockManagerId, maxMem)
+      // The onHeap and offHeap memory are always defined for new applications,
+      // but they can be missing if we are replaying old event logs.
+      val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
+        blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
       executorIdToStorageStatus(executorId) = storageStatus
 
       // Try to remove the dead storage status if same executor register the block manager twice.

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 241aacd..8f0d181 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -35,7 +35,11 @@ import org.apache.spark.internal.Logging
  * class cannot mutate the source of the information. Accesses are not thread-safe.
  */
 @DeveloperApi
-class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
+class StorageStatus(
+    val blockManagerId: BlockManagerId,
+    val maxMemory: Long,
+    val maxOnHeapMem: Option[Long],
+    val maxOffHeapMem: Option[Long]) {
 
   /**
    * Internal representation of the blocks stored in this block manager.
@@ -46,25 +50,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
   private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
   private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
 
-  /**
-   * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
-   *
-   * As with the block maps, we store the storage information separately for RDD blocks and
-   * non-RDD blocks for the same reason. In particular, RDD storage information is stored
-   * in a map indexed by the RDD ID to the following 4-tuple:
-   *
-   *   (memory size, disk size, storage level)
-   *
-   * We assume that all the blocks that belong to the same RDD have the same storage level.
-   * This field is not relevant to non-RDD blocks, however, so the storage information for
-   * non-RDD blocks contains only the first 3 fields (in the same order).
-   */
-  private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
-  private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
+  private case class RddStorageInfo(memoryUsage: Long, diskUsage: Long, level: StorageLevel)
+  private val _rddStorageInfo = new mutable.HashMap[Int, RddStorageInfo]
+
+  private case class NonRddStorageInfo(var onHeapUsage: Long, var offHeapUsage: Long,
+      var diskUsage: Long)
+  private val _nonRddStorageInfo = NonRddStorageInfo(0L, 0L, 0L)
 
   /** Create a storage status with an initial set of blocks, leaving the source unmodified. */
-  def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
-    this(bmid, maxMem)
+  def this(
+      bmid: BlockManagerId,
+      maxMemory: Long,
+      maxOnHeapMem: Option[Long],
+      maxOffHeapMem: Option[Long],
+      initialBlocks: Map[BlockId, BlockStatus]) {
+    this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem)
     initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
   }
 
@@ -176,26 +176,57 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
    */
   def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
 
+  /** Return the max memory can be used by this block manager. */
+  def maxMem: Long = maxMemory
+
   /** Return the memory remaining in this block manager. */
   def memRemaining: Long = maxMem - memUsed
 
+  /** Return the memory used by caching RDDs */
+  def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L)
+
   /** Return the memory used by this block manager. */
-  def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+  def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L)
 
-  /** Return the memory used by caching RDDs */
-  def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+  /** Return the on-heap memory remaining in this block manager. */
+  def onHeapMemRemaining: Option[Long] =
+    for (m <- maxOnHeapMem; o <- onHeapMemUsed) yield m - o
+
+  /** Return the off-heap memory remaining in this block manager. */
+  def offHeapMemRemaining: Option[Long] =
+    for (m <- maxOffHeapMem; o <- offHeapMemUsed) yield m - o
+
+  /** Return the on-heap memory used by this block manager. */
+  def onHeapMemUsed: Option[Long] = onHeapCacheSize.map(_ + _nonRddStorageInfo.onHeapUsage)
+
+  /** Return the off-heap memory used by this block manager. */
+  def offHeapMemUsed: Option[Long] = offHeapCacheSize.map(_ + _nonRddStorageInfo.offHeapUsage)
+
+  /** Return the memory used by on-heap caching RDDs */
+  def onHeapCacheSize: Option[Long] = maxOnHeapMem.map { _ =>
+    _rddStorageInfo.collect {
+      case (_, storageInfo) if !storageInfo.level.useOffHeap => storageInfo.memoryUsage
+    }.sum
+  }
+
+  /** Return the memory used by off-heap caching RDDs */
+  def offHeapCacheSize: Option[Long] = maxOffHeapMem.map { _ =>
+    _rddStorageInfo.collect {
+      case (_, storageInfo) if storageInfo.level.useOffHeap => storageInfo.memoryUsage
+    }.sum
+  }
 
   /** Return the disk space used by this block manager. */
-  def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+  def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
 
   /** Return the memory used by the given RDD in this block manager in O(1) time. */
-  def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+  def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L)
 
   /** Return the disk space used by the given RDD in this block manager in O(1) time. */
-  def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+  def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L)
 
   /** Return the storage level, if any, used by the given RDD in this block manager. */
-  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
+  def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level)
 
   /**
    * Update the relevant storage info, taking into account any existing status for this block.
@@ -210,10 +241,12 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
     val (oldMem, oldDisk) = blockId match {
       case RDDBlockId(rddId, _) =>
         _rddStorageInfo.get(rddId)
-          .map { case (mem, disk, _) => (mem, disk) }
+          .map { case RddStorageInfo(mem, disk, _) => (mem, disk) }
           .getOrElse((0L, 0L))
-      case _ =>
-        _nonRddStorageInfo
+      case _ if !level.useOffHeap =>
+        (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
+      case _ if level.useOffHeap =>
+        (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
     }
     val newMem = math.max(oldMem + changeInMem, 0L)
     val newDisk = math.max(oldDisk + changeInDisk, 0L)
@@ -225,13 +258,17 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
         if (newMem + newDisk == 0) {
           _rddStorageInfo.remove(rddId)
         } else {
-          _rddStorageInfo(rddId) = (newMem, newDisk, level)
+          _rddStorageInfo(rddId) = RddStorageInfo(newMem, newDisk, level)
         }
       case _ =>
-        _nonRddStorageInfo = (newMem, newDisk)
+        if (!level.useOffHeap) {
+          _nonRddStorageInfo.onHeapUsage = newMem
+        } else {
+          _nonRddStorageInfo.offHeapUsage = newMem
+        }
+        _nonRddStorageInfo.diskUsage = newDisk
     }
   }
-
 }
 
 /** Helper methods for storage-related objects. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index d849ce7..0a3c63d 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -40,7 +40,8 @@ private[ui] case class ExecutorSummaryInfo(
     totalShuffleRead: Long,
     totalShuffleWrite: Long,
     isBlacklisted: Int,
-    maxMemory: Long,
+    maxOnHeapMem: Long,
+    maxOffHeapMem: Long,
     executorLogs: Map[String, String])
 
 
@@ -53,6 +54,34 @@ private[ui] class ExecutorsPage(
     val content =
       <div>
         {
+          <div>
+            <span class="expand-additional-metrics">
+              <span class="expand-additional-metrics-arrow arrow-closed"></span>
+              <a>Show Additional Metrics</a>
+            </span>
+            <div class="additional-metrics collapsed">
+              <ul>
+                <li>
+                  <input type="checkbox" id="select-all-metrics"/>
+                  <span class="additional-metric-title"><em>(De)select All</em></span>
+                </li>
+                <li>
+                  <span data-toggle="tooltip"
+                        title={ExecutorsPage.ON_HEAP_MEMORY_TOOLTIP} data-placement="right">
+                    <input type="checkbox" name="on_heap_memory"/>
+                    <span class="additional-metric-title">On Heap Storage Memory</span>
+                  </span>
+                </li>
+                <li>
+                  <span data-toggle="tooltip"
+                        title={ExecutorsPage.OFF_HEAP_MEMORY_TOOLTIP} data-placement="right">
+                    <input type="checkbox" name="off_heap_memory"/>
+                    <span class="additional-metric-title">Off Heap Storage Memory</span>
+                  </span>
+                </li>
+              </ul>
+            </div>
+          </div> ++
           <div id="active-executors"></div> ++
           <script src={UIUtils.prependBaseUri("/static/utils.js")}></script> ++
           <script src={UIUtils.prependBaseUri("/static/executorspage.js")}></script> ++
@@ -65,6 +94,11 @@ private[ui] class ExecutorsPage(
 }
 
 private[spark] object ExecutorsPage {
+  private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " +
+    "storage of data like RDD partitions cached in memory."
+  private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " +
+    "storage of data like RDD partitions cached in memory."
+
   /** Represent an executor's info as a map given a storage status index */
   def getExecInfo(
       listener: ExecutorsListener,
@@ -80,6 +114,10 @@ private[spark] object ExecutorsPage {
     val rddBlocks = status.numBlocks
     val memUsed = status.memUsed
     val maxMem = status.maxMem
+    val onHeapMemUsed = status.onHeapMemUsed
+    val offHeapMemUsed = status.offHeapMemUsed
+    val maxOnHeapMem = status.maxOnHeapMem
+    val maxOffHeapMem = status.maxOffHeapMem
     val diskUsed = status.diskUsed
     val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))
 
@@ -103,7 +141,11 @@ private[spark] object ExecutorsPage {
       taskSummary.shuffleWrite,
       taskSummary.isBlacklisted,
       maxMem,
-      taskSummary.executorLogs
+      taskSummary.executorLogs,
+      onHeapMemUsed,
+      offHeapMemUsed,
+      maxOnHeapMem,
+      maxOffHeapMem
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 227e940..a1a0c72 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -147,7 +147,8 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
   /** Header fields for the worker table */
   private def workerHeader = Seq(
     "Host",
-    "Memory Usage",
+    "On Heap Memory Usage",
+    "Off Heap Memory Usage",
     "Disk Usage")
 
   /** Render an HTML row representing a worker */
@@ -155,8 +156,12 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
     <tr>
       <td>{worker.address}</td>
       <td>
-        {Utils.bytesToString(worker.memoryUsed)}
-        ({Utils.bytesToString(worker.memoryRemaining)} Remaining)
+        {Utils.bytesToString(worker.onHeapMemoryUsed.getOrElse(0L))}
+        ({Utils.bytesToString(worker.onHeapMemoryRemaining.getOrElse(0L))} Remaining)
+      </td>
+      <td>
+        {Utils.bytesToString(worker.offHeapMemoryUsed.getOrElse(0L))}
+        ({Utils.bytesToString(worker.offHeapMemoryRemaining.getOrElse(0L))} Remaining)
       </td>
       <td>{Utils.bytesToString(worker.diskUsed)}</td>
     </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1d2cb7a..8296c42 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -182,7 +182,9 @@ private[spark] object JsonProtocol {
     ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
     ("Block Manager ID" -> blockManagerId) ~
     ("Maximum Memory" -> blockManagerAdded.maxMem) ~
-    ("Timestamp" -> blockManagerAdded.time)
+    ("Timestamp" -> blockManagerAdded.time) ~
+    ("Maximum Onheap Memory" -> blockManagerAdded.maxOnHeapMem) ~
+    ("Maximum Offheap Memory" -> blockManagerAdded.maxOffHeapMem)
   }
 
   def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
@@ -612,7 +614,9 @@ private[spark] object JsonProtocol {
     val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
     val maxMem = (json \ "Maximum Memory").extract[Long]
     val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
-    SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
+    val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
+    val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+    SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
   }
 
   def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
new file mode 100644
index 0000000..e732af2
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -0,0 +1,139 @@
+[ {
+  "id" : "2",
+  "hostPort" : "172.22.0.167:51487",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 4,
+  "completedTasks" : 0,
+  "totalTasks" : 4,
+  "totalDuration" : 2537,
+  "totalGCTime" : 88,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout",
+    "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "driver",
+  "hostPort" : "172.22.0.167:51475",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 0,
+  "maxTasks" : 0,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : { },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "1",
+  "hostPort" : "172.22.0.167:51490",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 4,
+  "totalTasks" : 4,
+  "totalDuration" : 3152,
+  "totalGCTime" : 68,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout",
+    "stderr" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"
+  },
+
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "0",
+  "hostPort" : "172.22.0.167:51491",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 4,
+  "completedTasks" : 0,
+  "totalTasks" : 4,
+  "totalDuration" : 2551,
+  "totalGCTime" : 116,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout",
+    "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+}, {
+  "id" : "3",
+  "hostPort" : "172.22.0.167:51485",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 4,
+  "maxTasks" : 4,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 12,
+  "totalTasks" : 12,
+  "totalDuration" : 2453,
+  "totalGCTime" : 72,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : true,
+  "maxMemory" : 908381388,
+  "executorLogs" : {
+    "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout",
+    "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
+} ]

http://git-wip-us.apache.org/repos/asf/spark/blob/a4491626/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 5914a1c..e732af2 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -17,11 +17,15 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout",
     "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"
-  }
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "driver",
   "hostPort" : "172.22.0.167:51475",
@@ -41,8 +45,12 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
-  "executorLogs" : { }
+  "maxMemory" : 908381388,
+  "executorLogs" : { },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -62,11 +70,16 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout",
     "stderr" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"
-  }
+  },
+
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -86,11 +99,15 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout",
     "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"
-  }
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -110,9 +127,13 @@
   "totalShuffleRead" : 0,
   "totalShuffleWrite" : 0,
   "isBlacklisted" : true,
-  "maxMemory" : 384093388,
+  "maxMemory" : 908381388,
   "executorLogs" : {
     "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout",
     "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"
-  }
+  },
+  "onHeapMemoryUsed" : 0,
+  "offHeapMemoryUsed" : 0,
+  "maxOnHeapMemory" : 384093388,
+  "maxOffHeapMemory" : 524288000
 } ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org