You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/05/21 18:12:07 UTC

[spark] branch master updated: [SPARK-29303][WEB UI] Add UI support for stage level scheduling

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b64688e  [SPARK-29303][WEB UI] Add UI support for stage level scheduling
b64688e is described below

commit b64688ebbaac7afd3734c0d84d1e77b1fd2d2e9d
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Thu May 21 13:11:35 2020 -0500

    [SPARK-29303][WEB UI] Add UI support for stage level scheduling
    
    ### What changes were proposed in this pull request?
    
    This adds UI updates to support stage level scheduling and ResourceProfiles. 3 main things have been added. ResourceProfile id added to the Stage page, the Executors page now has an optional selectable column to show the ResourceProfile Id of each executor, and the Environment page now has a section with the ResourceProfile ids.  Along with this the rest api for environment page was updated to include the Resource profile information.
    
    I debating on splitting the resource profile information into its own page but I wasn't sure it called for a completely separate page. Open to peoples thoughts on this.
    
    Screen shots:
    ![Screen Shot 2020-04-01 at 3 07 46 PM](https://user-images.githubusercontent.com/4563792/78185169-469a7000-7430-11ea-8b0c-d9ede2d41df8.png)
    ![Screen Shot 2020-04-01 at 3 08 14 PM](https://user-images.githubusercontent.com/4563792/78185175-48fcca00-7430-11ea-8d1d-6b9333700f32.png)
    ![Screen Shot 2020-04-01 at 3 09 03 PM](https://user-images.githubusercontent.com/4563792/78185176-4a2df700-7430-11ea-92d9-73c382bb0f32.png)
    ![Screen Shot 2020-04-01 at 11 05 48 AM](https://user-images.githubusercontent.com/4563792/78185186-4dc17e00-7430-11ea-8962-f749dd47ea60.png)
    
    ### Why are the changes needed?
    
    For user to be able to know what resource profile was used with which stage and executors. The resource profile information is also available so user debugging can see exactly what resources were requested with that profile.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, UI updates.
    
    ### How was this patch tested?
    
    Unit tests and tested on yarn both active applications and with the history server.
    
    Closes #28094 from tgravescs/SPARK-29303-pr.
    
    Lead-authored-by: Thomas Graves <tg...@nvidia.com>
    Co-authored-by: Thomas Graves <tg...@apache.org>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/SparkFirehoseListener.java    |   5 +
 .../spark/ui/static/executorspage-template.html    |   1 +
 .../org/apache/spark/ui/static/executorspage.js    |   7 +-
 .../main/scala/org/apache/spark/SparkContext.scala |   2 +-
 .../deploy/history/HistoryAppStatusStore.scala     |   3 +-
 .../spark/resource/ResourceProfileManager.scala    |   7 +-
 .../spark/scheduler/EventLoggingListener.scala     |   4 +
 .../org/apache/spark/scheduler/SparkListener.scala |  12 +++
 .../apache/spark/scheduler/SparkListenerBus.scala  |   2 +
 .../apache/spark/status/AppStatusListener.scala    |  33 +++++-
 .../org/apache/spark/status/AppStatusStore.scala   |   8 +-
 .../scala/org/apache/spark/status/LiveEntity.scala |  25 ++++-
 .../status/api/v1/OneApplicationResource.scala     |   4 +-
 .../scala/org/apache/spark/status/api/v1/api.scala |  18 +++-
 .../scala/org/apache/spark/status/storeTypes.scala |   7 ++
 .../org/apache/spark/ui/env/EnvironmentPage.scala  |  48 +++++++++
 .../scala/org/apache/spark/ui/jobs/JobPage.scala   |   4 +-
 .../scala/org/apache/spark/ui/jobs/StagePage.scala |   4 +
 .../scala/org/apache/spark/util/JsonProtocol.scala | 101 +++++++++++++++++--
 .../app_environment_expectation.json               |   3 +-
 .../application_list_json_expectation.json         |  15 +++
 .../blacklisting_for_stage_expectation.json        |   3 +-
 .../blacklisting_node_for_stage_expectation.json   |   3 +-
 .../complete_stage_list_json_expectation.json      |   9 +-
 .../completed_app_list_json_expectation.json       |  15 +++
 .../executor_list_json_expectation.json            |   3 +-
 ...ist_with_executor_metrics_json_expectation.json |  12 ++-
 .../executor_memory_usage_expectation.json         |  15 ++-
 .../executor_node_blacklisting_expectation.json    |  15 ++-
 ...de_blacklisting_unblacklisting_expectation.json |  15 ++-
 .../executor_resource_information_expectation.json |   9 +-
 .../failed_stage_list_json_expectation.json        |   3 +-
 .../limit_app_list_json_expectation.json           |  30 +++---
 .../minDate_app_list_json_expectation.json         |  18 +++-
 .../minEndDate_app_list_json_expectation.json      |  15 +++
 .../multiple_resource_profiles_expectation.json    | 112 +++++++++++++++++++++
 .../one_stage_attempt_json_expectation.json        |   3 +-
 .../one_stage_json_expectation.json                |   3 +-
 .../stage_list_json_expectation.json               |  12 ++-
 ...age_list_with_accumulable_json_expectation.json |   3 +-
 .../stage_with_accumulable_json_expectation.json   |   3 +-
 .../spark-events/application_1578436911597_0052    |  27 +++++
 .../spark/ExecutorAllocationManagerSuite.scala     |   2 +-
 .../spark/deploy/history/HistoryServerSuite.scala  |   1 +
 .../resource/ResourceProfileManagerSuite.scala     |  13 ++-
 .../scala/org/apache/spark/ui/StagePageSuite.scala |   3 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  |  94 +++++++++++++++--
 dev/.rat-excludes                                  |   1 +
 .../KubernetesClusterSchedulerBackendSuite.scala   |   5 +-
 49 files changed, 657 insertions(+), 103 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 731f6fc..579e7ff 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -163,6 +163,11 @@ public class SparkFirehoseListener implements SparkListenerInterface {
   }
 
   @Override
+  public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
+    onEvent(event);
+  }
+
+  @Override
   public void onOtherEvent(SparkListenerEvent event) {
     onEvent(event);
   }
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 0b26bfc..0729dfe 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -89,6 +89,7 @@ limitations under the License.
           <th>Disk Used</th>
           <th>Cores</th>
           <th>Resources</th>
+          <th>Resource Profile Id</th>
           <th><span data-toggle="tooltip" data-placement="top" title="Number of tasks currently executing. Darker shading highlights executors with more active tasks.">Active Tasks</span></th>
           <th><span data-toggle="tooltip" data-placement="top" title="Number of tasks that have failed on this executor. Darker shading highlights executors with a high proportion of failed tasks.">Failed Tasks</span></th>
           <th>Complete Tasks</th>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index ec57797..520edb9 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -119,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
 }
 
 var sumOptionalColumns = [3, 4];
-var execOptionalColumns = [5, 6, 9];
+var execOptionalColumns = [5, 6, 9, 10];
 var execDataTable;
 var sumDataTable;
 
@@ -415,6 +415,7 @@ $(document).ready(function () {
                         {data: 'diskUsed', render: formatBytes},
                         {data: 'totalCores'},
                         {name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
+                        {name: 'resourceProfileIdCol', data: 'resourceProfileId'},
                         {
                             data: 'activeTasks',
                             "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
@@ -461,7 +462,8 @@ $(document).ready(function () {
                     "columnDefs": [
                         {"visible": false, "targets": 5},
                         {"visible": false, "targets": 6},
-                        {"visible": false, "targets": 9}
+                        {"visible": false, "targets": 9},
+                        {"visible": false, "targets": 10}
                     ],
                     "deferRender": true
                 };
@@ -570,6 +572,7 @@ $(document).ready(function () {
                     "<div id='on_heap_memory' class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
                     "<div id='off_heap_memory' class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
                     "<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
+                    "<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'>Resource Profile Id</div>" +
                     "</div>");
 
                 reselectCheckboxesBasedOnTaskTableState();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5c92527..38d7319 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -435,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
     }
 
     _listenerBus = new LiveListenerBus(_conf)
-    _resourceProfileManager = new ResourceProfileManager(_conf)
+    _resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus)
 
     // Initialize the app status store and listener before SparkEnv is created so that it gets
     // all events.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
index 74105002..7973652 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
@@ -72,7 +72,8 @@ private[spark] class HistoryAppStatusStore(
       source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
       source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
       source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
-      source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
+      source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources,
+      source.resourceProfileId)
   }
 
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index c3e2444..f365548 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -25,17 +25,19 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Tests._
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.Utils.isTesting
 
 /**
  * Manager of resource profiles. The manager allows one place to keep the actual ResourceProfiles
  * and everywhere else we can use the ResourceProfile Id to save on space.
- * Note we never remove a resource profile at this point. Its expected this number if small
+ * Note we never remove a resource profile at this point. Its expected this number is small
  * so this shouldn't be much overhead.
  */
 @Evolving
-private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
+private[spark] class ResourceProfileManager(sparkConf: SparkConf,
+    listenerBus: LiveListenerBus) extends Logging {
   private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]()
 
   private val (readLock, writeLock) = {
@@ -83,6 +85,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
       // force the computation of maxTasks and limitingResource now so we don't have cost later
       rp.limitingResource(sparkConf)
       logInfo(s"Added ResourceProfile id: ${rp.id}")
+      listenerBus.post(SparkListenerResourceProfileAdded(rp))
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 24e2a5e..b2e9a0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -235,6 +235,10 @@ private[spark] class EventLoggingListener(
     }
   }
 
+  override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
+    logEvent(event, flushLogger = true)
+  }
+
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     if (event.logEvent) {
       logEvent(event, flushLogger = true)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index c150b03..62d54f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
 import org.apache.spark.TaskEndReason
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 
@@ -207,6 +208,10 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 @DeveloperApi
 case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
 
+@DeveloperApi
+case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
+  extends SparkListenerEvent
+
 /**
  * Interface for listening to events from the Spark scheduler. Most applications should probably
  * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
@@ -348,6 +353,11 @@ private[spark] trait SparkListenerInterface {
    * Called when other events like SQL-specific events are posted.
    */
   def onOtherEvent(event: SparkListenerEvent): Unit
+
+  /**
+   * Called when a Resource Profile is added to the manager.
+   */
+  def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit
 }
 
 
@@ -421,4 +431,6 @@ abstract class SparkListener extends SparkListenerInterface {
       speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = { }
+
+  override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 8f6b7ad..3d316c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus
         listener.onBlockUpdated(blockUpdated)
       case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
         listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
+      case resourceProfileAdded: SparkListenerResourceProfileAdded =>
+        listener.onResourceProfileAdded(resourceProfileAdded)
       case _ => listener.onOtherEvent(event)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index c3f22f3..f7b0e9b 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -28,6 +28,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.CPUS_PER_TASK
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.resource.ResourceProfile.CPUS
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage._
@@ -51,7 +52,7 @@ private[spark] class AppStatusListener(
   private var sparkVersion = SPARK_VERSION
   private var appInfo: v1.ApplicationInfo = null
   private var appSummary = new AppSummary(0, 0)
-  private var coresPerTask: Int = 1
+  private var defaultCpusPerTask: Int = 1
 
   // How often to update live entities. -1 means "never update" when replaying applications,
   // meaning only the last write will happen. For live applications, this avoids a few
@@ -76,6 +77,7 @@ private[spark] class AppStatusListener(
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
+  private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
 
   private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
   // Keep the active executor count as a separate variable to avoid having to do synchronization
@@ -145,6 +147,20 @@ private[spark] class AppStatusListener(
     }
   }
 
+  override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
+    val maxTasks = if (event.resourceProfile.isCoresLimitKnown) {
+      Some(event.resourceProfile.maxTasksPerExecutor(conf))
+    } else {
+      None
+    }
+    val liveRP = new LiveResourceProfile(event.resourceProfile.id,
+      event.resourceProfile.executorResources, event.resourceProfile.taskResources, maxTasks)
+    liveResourceProfiles(event.resourceProfile.id) = liveRP
+    val rpInfo = new v1.ResourceProfileInfo(liveRP.resourceProfileId,
+      liveRP.executorResources, liveRP.taskResources)
+    kvstore.write(new ResourceProfileWrapper(rpInfo))
+  }
+
   override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
     val details = event.environmentDetails
 
@@ -159,10 +175,11 @@ private[spark] class AppStatusListener(
       details.getOrElse("Spark Properties", Nil),
       details.getOrElse("Hadoop Properties", Nil),
       details.getOrElse("System Properties", Nil),
-      details.getOrElse("Classpath Entries", Nil))
+      details.getOrElse("Classpath Entries", Nil),
+      Nil)
 
-    coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
-      .getOrElse(coresPerTask)
+    defaultCpusPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
+      .getOrElse(defaultCpusPerTask)
 
     kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
   }
@@ -197,10 +214,16 @@ private[spark] class AppStatusListener(
     exec.host = event.executorInfo.executorHost
     exec.isActive = true
     exec.totalCores = event.executorInfo.totalCores
-    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+    val rpId = event.executorInfo.resourceProfileId
+    val liveRP = liveResourceProfiles.get(rpId)
+    val cpusPerTask = liveRP.flatMap(_.taskResources.get(CPUS))
+      .map(_.amount.toInt).getOrElse(defaultCpusPerTask)
+    val maxTasksPerExec = liveRP.flatMap(_.maxTasksPerExecutor)
+    exec.maxTasks = maxTasksPerExec.getOrElse(event.executorInfo.totalCores / cpusPerTask)
     exec.executorLogs = event.executorInfo.logUrlMap
     exec.resources = event.executorInfo.resourcesInfo
     exec.attributes = event.executorInfo.attributes
+    exec.resourceProfileId = rpId
     liveUpdate(exec, System.nanoTime())
   }
 
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 42d4071..ea033d0 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkException}
+import org.apache.spark.resource.ResourceProfileManager
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.scope._
 import org.apache.spark.util.Utils
@@ -51,6 +52,10 @@ private[spark] class AppStatusStore(
     store.read(klass, klass.getName()).info
   }
 
+  def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
+    store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq
+  }
+
   def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
     val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
     if (statuses != null && !statuses.isEmpty()) {
@@ -486,7 +491,8 @@ private[spark] class AppStatusStore(
       accumulatorUpdates = stage.accumulatorUpdates,
       tasks = Some(tasks),
       executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
-      killedTasksSummary = stage.killedTasksSummary)
+      killedTasksSummary = stage.killedTasksSummary,
+      resourceProfileId = stage.resourceProfileId)
   }
 
   def rdd(rddId: Int): v1.RDDStorageInfo = {
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 2714f30..86cb4fe 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -28,7 +28,7 @@ import com.google.common.collect.Interners
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
 import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.{RDDInfo, StorageLevel}
@@ -245,6 +245,21 @@ private class LiveTask(
 
 }
 
+private class LiveResourceProfile(
+    val resourceProfileId: Int,
+    val executorResources: Map[String, ExecutorResourceRequest],
+    val taskResources: Map[String, TaskResourceRequest],
+    val maxTasksPerExecutor: Option[Int]) extends LiveEntity {
+
+  def toApi(): v1.ResourceProfileInfo = {
+    new v1.ResourceProfileInfo(resourceProfileId, executorResources, taskResources)
+  }
+
+  override protected def doUpdate(): Any = {
+    new ResourceProfileWrapper(toApi())
+  }
+}
+
 private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
 
   var hostPort: String = null
@@ -285,6 +300,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
   var usedOnHeap = 0L
   var usedOffHeap = 0L
 
+  var resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
   // peak values for executor level metrics
@@ -327,7 +344,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
       blacklistedInStages,
       Some(peakExecutorMetrics).filter(_.isSet),
       attributes,
-      resources)
+      resources,
+      resourceProfileId)
     new ExecutorSummaryWrapper(info)
   }
 }
@@ -465,7 +483,8 @@ private class LiveStage extends LiveEntity {
       accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
       tasks = None,
       executorSummary = None,
-      killedTasksSummary = killedSummary)
+      killedTasksSummary = killedSummary,
+      resourceProfileId = info.resourceProfileId)
   }
 
   override protected def doUpdate(): Any = {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
index cf5c759..e0c85fd 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
@@ -101,12 +101,14 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
   @Path("environment")
   def environmentInfo(): ApplicationEnvironmentInfo = withUI { ui =>
     val envInfo = ui.store.environmentInfo()
+    val resourceProfileInfo = ui.store.resourceProfileInfo()
     new v1.ApplicationEnvironmentInfo(
       envInfo.runtime,
       Utils.redact(ui.conf, envInfo.sparkProperties),
       Utils.redact(ui.conf, envInfo.hadoopProperties),
       Utils.redact(ui.conf, envInfo.systemProperties),
-      envInfo.classpathEntries)
+      envInfo.classpathEntries,
+      resourceProfileInfo)
   }
 
   @GET
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5ec9b36..e89e291 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.metrics.ExecutorMetricType
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest}
 
 case class ApplicationInfo private[spark](
     id: String,
@@ -62,6 +62,11 @@ case class ApplicationAttemptInfo private[spark](
 
 }
 
+class ResourceProfileInfo private[spark](
+    val id: Int,
+    val executorResources: Map[String, ExecutorResourceRequest],
+    val taskResources: Map[String, TaskResourceRequest])
+
 class ExecutorStageSummary private[spark](
     val taskTime : Long,
     val failedTasks : Int,
@@ -109,7 +114,8 @@ class ExecutorSummary private[spark](
     @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
     val peakMemoryMetrics: Option[ExecutorMetrics],
     val attributes: Map[String, String],
-    val resources: Map[String, ResourceInformation])
+    val resources: Map[String, ResourceInformation],
+    val resourceProfileId: Int)
 
 class MemoryMetrics private[spark](
     val usedOnHeapStorageMemory: Long,
@@ -252,7 +258,8 @@ class StageData private[spark](
     val accumulatorUpdates: Seq[AccumulableInfo],
     val tasks: Option[Map[Long, TaskData]],
     val executorSummary: Option[Map[String, ExecutorStageSummary]],
-    val killedTasksSummary: Map[String, Int])
+    val killedTasksSummary: Map[String, Int],
+    val resourceProfileId: Int)
 
 class TaskData private[spark](
     val taskId: Long,
@@ -365,12 +372,15 @@ class AccumulableInfo private[spark](
 class VersionInfo private[spark](
   val spark: String)
 
+// Note the resourceProfiles information are only added here on return from the
+// REST call, they are not stored with it.
 class ApplicationEnvironmentInfo private[spark] (
     val runtime: RuntimeInfo,
     val sparkProperties: Seq[(String, String)],
     val hadoopProperties: Seq[(String, String)],
     val systemProperties: Seq[(String, String)],
-    val classpathEntries: Seq[(String, String)])
+    val classpathEntries: Seq[(String, String)],
+    val resourceProfiles: Seq[ResourceProfileInfo])
 
 class RuntimeInfo private[spark](
     val javaVersion: String,
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index c957ff7..b40f730 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -374,6 +374,13 @@ private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
 
 }
 
+private[spark] class ResourceProfileWrapper(val rpInfo: ResourceProfileInfo) {
+
+  @JsonIgnore @KVIndex
+  def id: Int = rpInfo.id
+
+}
+
 private[spark] class ExecutorStageSummaryWrapper(
     val stageId: Int,
     val stageAttemptId: Int,
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
index c6eb461..2f5b731 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -19,9 +19,11 @@ package org.apache.spark.ui.env
 
 import javax.servlet.http.HttpServletRequest
 
+import scala.collection.mutable.StringBuilder
 import scala.xml.Node
 
 import org.apache.spark.SparkConf
+import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.ui._
 import org.apache.spark.util.Utils
@@ -38,6 +40,37 @@ private[ui] class EnvironmentPage(
       "Java Home" -> appEnv.runtime.javaHome,
       "Scala Version" -> appEnv.runtime.scalaVersion)
 
+    def constructExecutorRequestString(execReqs: Map[String, ExecutorResourceRequest]): String = {
+      execReqs.map {
+        case (_, execReq) =>
+          val execStr = new StringBuilder(s"\t${execReq.resourceName}: [amount: ${execReq.amount}")
+          if (execReq.discoveryScript.nonEmpty) {
+            execStr ++= s", discovery: ${execReq.discoveryScript}"
+          }
+          if (execReq.vendor.nonEmpty) {
+            execStr ++= s", vendor: ${execReq.vendor}"
+          }
+          execStr ++= "]"
+          execStr.toString()
+      }.mkString("\n")
+    }
+
+    def constructTaskRequestString(taskReqs: Map[String, TaskResourceRequest]): String = {
+      taskReqs.map {
+        case (_, taskReq) => s"\t${taskReq.resourceName}: [amount: ${taskReq.amount}]"
+      }.mkString("\n")
+    }
+
+    val resourceProfileInfo = store.resourceProfileInfo().map { rinfo =>
+      val einfo = constructExecutorRequestString(rinfo.executorResources)
+      val tinfo = constructTaskRequestString(rinfo.taskResources)
+      val res = s"Executor Reqs:\n$einfo\nTask Reqs:\n$tinfo"
+      (rinfo.id.toString, res)
+    }.toMap
+
+    val resourceProfileInformationTable = UIUtils.listingTable(resourceProfileHeader,
+      jvmRowDataPre, resourceProfileInfo.toSeq.sortWith(_._1.toInt < _._1.toInt),
+      fixedWidth = true, headerClasses = headerClassesNoSortValues)
     val runtimeInformationTable = UIUtils.listingTable(
       propertyHeader, jvmRow, jvmInformation.toSeq.sorted, fixedWidth = true,
       headerClasses = headerClasses)
@@ -77,6 +110,17 @@ private[ui] class EnvironmentPage(
         <div class="aggregated-sparkProperties collapsible-table">
           {sparkPropertiesTable}
         </div>
+        <span class="collapse-aggregated-execResourceProfileInformation collapse-table"
+              onClick="collapseTable('collapse-aggregated-execResourceProfileInformation',
+            'aggregated-execResourceProfileInformation')">
+          <h4>
+            <span class="collapse-table-arrow arrow-open"></span>
+            <a>Resource Profiles</a>
+          </h4>
+        </span>
+        <div class="aggregated-execResourceProfileInformation collapsible-table">
+          {resourceProfileInformationTable}
+        </div>
         <span class="collapse-aggregated-hadoopProperties collapse-table"
               onClick="collapseTable('collapse-aggregated-hadoopProperties',
             'aggregated-hadoopProperties')">
@@ -115,10 +159,14 @@ private[ui] class EnvironmentPage(
     UIUtils.headerSparkPage(request, "Environment", content, parent)
   }
 
+  private def resourceProfileHeader = Seq("Resource Profile Id", "Resource Profile Contents")
   private def propertyHeader = Seq("Name", "Value")
   private def classPathHeader = Seq("Resource", "Source")
   private def headerClasses = Seq("sorttable_alpha", "sorttable_alpha")
+  private def headerClassesNoSortValues = Seq("sorttable_numeric", "sorttable_nosort")
 
+  private def jvmRowDataPre(kv: (String, String)) =
+    <tr><td>{kv._1}</td><td><pre>{kv._2}</pre></td></tr>
   private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
   private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
   private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 9be7124..542dc39 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq, Unparsed, Utility}
 import org.apache.commons.text.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui._
@@ -253,7 +254,8 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
           accumulatorUpdates = Nil,
           tasks = None,
           executorSummary = None,
-          killedTasksSummary = Map())
+          killedTasksSummary = Map(),
+          ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 7973d30..1b07227 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -143,6 +143,10 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
       <div>
         <ul class="list-unstyled">
           <li>
+            <strong>Resource Profile Id: </strong>
+            {stageData.resourceProfileId}
+          </li>
+          <li>
             <strong>Total Time Across All Tasks: </strong>
             {UIUtils.formatDuration(stageData.executorRunTime)}
           </li>
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 9254ac9..26bbff5 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
@@ -105,6 +105,8 @@ private[spark] object JsonProtocol {
         stageExecutorMetricsToJson(stageExecutorMetrics)
       case blockUpdate: SparkListenerBlockUpdated =>
         blockUpdateToJson(blockUpdate)
+      case resourceProfileAdded: SparkListenerResourceProfileAdded =>
+        resourceProfileAddedToJson(resourceProfileAdded)
       case _ => parse(mapper.writeValueAsString(event))
     }
   }
@@ -224,6 +226,15 @@ private[spark] object JsonProtocol {
     ("Timestamp" -> applicationEnd.time)
   }
 
+  def resourceProfileAddedToJson(profileAdded: SparkListenerResourceProfileAdded): JValue = {
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.resourceProfileAdded) ~
+      ("Resource Profile Id" -> profileAdded.resourceProfile.id) ~
+      ("Executor Resource Requests" ->
+        executorResourceRequestMapToJson(profileAdded.resourceProfile.executorResources)) ~
+      ("Task Resource Requests" ->
+        taskResourceRequestMapToJson(profileAdded.resourceProfile.taskResources))
+  }
+
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
     ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
     ("Timestamp" -> executorAdded.time) ~
@@ -297,7 +308,8 @@ private[spark] object JsonProtocol {
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
     ("Failure Reason" -> failureReason) ~
-    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
+    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values)) ~
+    ("Resource Profile Id" -> stageInfo.resourceProfileId)
   }
 
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -500,7 +512,8 @@ private[spark] object JsonProtocol {
     ("Total Cores" -> executorInfo.totalCores) ~
     ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
     ("Attributes" -> mapToJson(executorInfo.attributes)) ~
-    ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo))
+    ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
+    ("Resource Profile Id" -> executorInfo.resourceProfileId)
   }
 
   def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
@@ -518,6 +531,34 @@ private[spark] object JsonProtocol {
     ("Disk Size" -> blockUpdatedInfo.diskSize)
   }
 
+  def executorResourceRequestToJson(execReq: ExecutorResourceRequest): JValue = {
+    ("Resource Name" -> execReq.resourceName) ~
+    ("Amount" -> execReq.amount) ~
+    ("Discovery Script" -> execReq.discoveryScript) ~
+    ("Vendor" -> execReq.vendor)
+  }
+
+  def executorResourceRequestMapToJson(m: Map[String, ExecutorResourceRequest]): JValue = {
+    val jsonFields = m.map {
+      case (k, execReq) =>
+        JField(k, executorResourceRequestToJson(execReq))
+    }
+    JObject(jsonFields.toList)
+  }
+
+  def taskResourceRequestToJson(taskReq: TaskResourceRequest): JValue = {
+    ("Resource Name" -> taskReq.resourceName) ~
+    ("Amount" -> taskReq.amount)
+  }
+
+  def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest]): JValue = {
+    val jsonFields = m.map {
+      case (k, taskReq) =>
+        JField(k, taskResourceRequestToJson(taskReq))
+    }
+    JObject(jsonFields.toList)
+  }
+
   /** ------------------------------ *
    * Util JSON serialization methods |
    * ------------------------------- */
@@ -577,6 +618,7 @@ private[spark] object JsonProtocol {
     val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
     val stageExecutorMetrics = Utils.getFormattedClassName(SparkListenerStageExecutorMetrics)
     val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
+    val resourceProfileAdded = Utils.getFormattedClassName(SparkListenerResourceProfileAdded)
   }
 
   def sparkEventFromJson(json: JValue): SparkListenerEvent = {
@@ -602,6 +644,7 @@ private[spark] object JsonProtocol {
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
       case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
+      case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
       case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
     }
@@ -678,6 +721,45 @@ private[spark] object JsonProtocol {
     SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
 
+  def resourceProfileAddedFromJson(json: JValue): SparkListenerResourceProfileAdded = {
+    val profId = (json \ "Resource Profile Id").extract[Int]
+    val executorReqs = executorResourceRequestMapFromJson(json \ "Executor Resource Requests")
+    val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource Requests")
+    val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
+    rp.setResourceProfileId(profId)
+    SparkListenerResourceProfileAdded(rp)
+  }
+
+  def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = {
+    val rName = (json \ "Resource Name").extract[String]
+    val amount = (json \ "Amount").extract[Int]
+    val discoveryScript = (json \ "Discovery Script").extract[String]
+    val vendor = (json \ "Vendor").extract[String]
+    new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
+  }
+
+  def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
+    val rName = (json \ "Resource Name").extract[String]
+    val amount = (json \ "Amount").extract[Int]
+    new TaskResourceRequest(rName, amount)
+  }
+
+  def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
+    val jsonFields = json.asInstanceOf[JObject].obj
+    jsonFields.map { case JField(k, v) =>
+      val req = taskResourceRequestFromJson(v)
+      (k, req)
+    }.toMap
+  }
+
+  def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
+    val jsonFields = json.asInstanceOf[JObject].obj
+    jsonFields.map { case JField(k, v) =>
+      val req = executorResourceRequestFromJson(v)
+      (k, req)
+    }.toMap
+  }
+
   def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
     // For compatible with previous event logs
     val hadoopProperties = jsonOption(json \ "Hadoop Properties").map(mapFromJson(_).toSeq)
@@ -804,9 +886,10 @@ private[spark] object JsonProtocol {
       }
     }
 
-    val stageInfo = new StageInfo(
-      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details,
-      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val rpId = jsonOption(json \ "Resource Profile Id").map(_.extract[Int])
+    val stageProf = rpId.getOrElse(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos,
+      parentIds, details, resourceProfileId = stageProf)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason
@@ -1109,7 +1192,11 @@ private[spark] object JsonProtocol {
       case Some(resources) => resourcesMapFromJson(resources).toMap
       case None => Map.empty[String, ResourceInformation]
     }
-    new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources)
+    val resourceProfileId = jsonOption(json \ "Resource Profile Id") match {
+      case Some(id) => id.extract[Int]
+      case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+    }
+    new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources, resourceProfileId)
   }
 
   def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
diff --git a/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json b/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
index a646172..0b617a7 100644
--- a/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
@@ -282,5 +282,6 @@
     [ "/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-jobclient-2.2.0.jar", "System Classpath" ],
     [ "/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sketch_2.11-2.1.0-SNAPSHOT.jar", "System Classpath" ],
     [ "/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-model-1.2.15.jar", "System Classpath" ]
-  ]
+  ],
+  "resourceProfiles" : [ ]
 }
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 6e6d28b..d2b3d1b 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
index b18b19f..0d197ea 100644
--- a/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_for_stage_expectation.json
@@ -717,5 +717,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
index 8d11081..24d73fa 100644
--- a/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/blacklisting_node_for_stage_expectation.json
@@ -876,5 +876,6 @@
       "isBlacklistedForStage" : true
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git a/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
index a47cd26..a452488 100644
--- a/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
@@ -41,7 +41,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 6, 5 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 1,
@@ -85,7 +86,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 1, 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 0,
@@ -129,5 +131,6 @@
   "schedulingPool" : "default",
   "rddIds" : [ 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 6e6d28b..d2b3d1b 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index eadf271..6742567 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -22,5 +22,6 @@
   "executorLogs" : { },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
index d322485..d052a27 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
@@ -50,7 +50,8 @@
     "MajorGCTime" : 144
   },
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "test-3.vpc.company.com:37641",
@@ -116,7 +117,8 @@
     "NM_HOST" : "test-3.vpc.company.com",
     "CONTAINER_ID" : "container_1553914137147_0018_01_000004"
   },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "test-4.vpc.company.com:33179",
@@ -182,7 +184,8 @@
     "NM_HOST" : "test-4.vpc.company.com",
     "CONTAINER_ID" : "container_1553914137147_0018_01_000003"
   },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "test-2.vpc.company.com:43764",
@@ -248,5 +251,6 @@
     "NM_HOST" : "test-2.vpc.company.com",
     "CONTAINER_ID" : "container_1553914137147_0018_01_000002"
   },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
index 7c3f77d..91574ca 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -28,7 +28,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -62,7 +63,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ,{
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -96,7 +98,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -130,7 +133,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -164,5 +168,6 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 0986e85..f14b9a5 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -28,7 +28,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -62,7 +63,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -96,7 +98,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -130,7 +133,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -164,5 +168,6 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
index 26d6651..3645387 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
@@ -22,7 +22,8 @@
   "executorLogs" : { },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.111:64543",
@@ -50,7 +51,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.111:64539",
@@ -78,7 +80,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.111:64541",
@@ -106,7 +109,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.111:64540",
@@ -134,5 +138,6 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
index e69ab3b..165389c 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_resource_information_expectation.json
@@ -28,7 +28,8 @@
   },
   "blacklistedInStages" : [ ],
   "attributes" : { },
-  "resources" : { }
+  "resources" : { },
+  "resourceProfileId" : 0
 }, {
   "id" : "2",
   "hostPort" : "tomg-test:46005",
@@ -77,7 +78,8 @@
       "name" : "gpu",
       "addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12" ]
     }
-  }
+  },
+  "resourceProfileId" : 0
 }, {
   "id" : "1",
   "hostPort" : "tomg-test:44873",
@@ -126,5 +128,6 @@
       "name" : "gpu",
       "addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12" ]
     }
-  }
+  },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
index da26271..c387416 100644
--- a/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
@@ -42,5 +42,6 @@
   "schedulingPool" : "default",
   "rddIds" : [ 3, 2 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 3102909..82489e9 100644
--- a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -28,19 +43,4 @@
     "endTimeEpoch" : 1554756046454,
     "lastUpdatedEpoch" : 0
   } ]
-}, {
-  "id" : "application_1516285256255_0012",
-  "name" : "Spark shell",
-  "attempts" : [ {
-    "startTime" : "2018-01-18T18:30:35.119GMT",
-    "endTime" : "2018-01-18T18:38:27.938GMT",
-    "lastUpdated" : "",
-    "duration" : 472819,
-    "sparkUser" : "attilapiros",
-    "completed" : true,
-    "appSparkVersion" : "2.3.0-SNAPSHOT",
-    "lastUpdatedEpoch" : 0,
-    "startTimeEpoch" : 1516300235119,
-    "endTimeEpoch" : 1516300707938
-  } ]
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 794f151..ac2bb0e 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -1,5 +1,19 @@
-[
-  {
+[ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
     "id": "application_1555004656427_0144",
     "name": "Spark shell",
     "attempts": [
diff --git a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index adcdcce..1561676 100644
--- a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1578436911597_0052",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2020-01-11T17:44:22.851GMT",
+    "endTime" : "2020-01-11T17:46:42.615GMT",
+    "lastUpdated" : "",
+    "duration" : 139764,
+    "sparkUser" : "tgraves",
+    "completed" : true,
+    "appSparkVersion" : "3.0.0-SNAPSHOT",
+    "endTimeEpoch" : 1578764802615,
+    "startTimeEpoch" : 1578764662851,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1555004656427_0144",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/multiple_resource_profiles_expectation.json b/core/src/test/resources/HistoryServerExpectations/multiple_resource_profiles_expectation.json
new file mode 100644
index 0000000..5c1e4cc
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/multiple_resource_profiles_expectation.json
@@ -0,0 +1,112 @@
+{
+  "runtime" : {
+    "javaVersion" : "1.8.0_232 (Private Build)",
+    "javaHome" : "/usr/lib/jvm/java-8-openjdk-amd64/jre",
+    "scalaVersion" : "version 2.12.10"
+  },
+  "sparkProperties" : [ ],
+  "hadoopProperties" : [ ],
+  "systemProperties" : [ ],
+  "classpathEntries" : [ ],
+  "resourceProfiles" : [ {
+    "id" : 0,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 1,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "memory" : {
+        "resourceName" : "memory",
+        "amount" : 1024,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1,
+        "discoveryScript" : "/home/tgraves/getGpus",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 1.0
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1.0
+      }
+    }
+  }, {
+    "id" : 1,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 4,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1,
+        "discoveryScript" : "./getGpus",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 1.0
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1.0
+      }
+    }
+  }, {
+    "id" : 2,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 2,
+        "discoveryScript" : "",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 2.0
+      }
+    }
+  }, {
+    "id" : 3,
+    "executorResources" : {
+      "cores" : {
+        "resourceName" : "cores",
+        "amount" : 4,
+        "discoveryScript" : "",
+        "vendor" : ""
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1,
+        "discoveryScript" : "./getGpus",
+        "vendor" : ""
+      }
+    },
+    "taskResources" : {
+      "cpus" : {
+        "resourceName" : "cpus",
+        "amount" : 2.0
+      },
+      "gpu" : {
+        "resourceName" : "gpu",
+        "amount" : 1.0
+      }
+    }
+  } ]
+}
diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
index 7919070..3db7d55 100644
--- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
@@ -462,5 +462,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
index 50d3f74..8ef3769 100644
--- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
@@ -462,5 +462,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
index edbac71..a31c907 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
@@ -41,7 +41,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 6, 5 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "FAILED",
   "stageId" : 2,
@@ -86,7 +87,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 3, 2 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 1,
@@ -130,7 +132,8 @@
   "schedulingPool" : "default",
   "rddIds" : [ 1, 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }, {
   "status" : "COMPLETE",
   "stageId" : 0,
@@ -174,5 +177,6 @@
   "schedulingPool" : "default",
   "rddIds" : [ 0 ],
   "accumulatorUpdates" : [ ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
index 836f2cb..08089d4 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
@@ -45,5 +45,6 @@
     "name" : "my counter",
     "value" : "5050"
   } ],
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
index 735a825..3b5476a 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
@@ -506,5 +506,6 @@
       "isBlacklistedForStage" : false
     }
   },
-  "killedTasksSummary" : { }
+  "killedTasksSummary" : { },
+  "resourceProfileId" : 0
 }
diff --git a/core/src/test/resources/spark-events/application_1578436911597_0052 b/core/src/test/resources/spark-events/application_1578436911597_0052
new file mode 100644
index 0000000..c57481a
--- /dev/null
+++ b/core/src/test/resources/spark-events/application_1578436911597_0052
@@ -0,0 +1,27 @@
+{"Event":"SparkListenerLogStart","Spark Version":"3.0.0-SNAPSHOT"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery Script":"","Vendor":""},"memory":{"Resource Name":"memory","Amount":1024,"Discovery Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery Script":"/home/tgraves/getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"10.10.10.10","Port":32957},"Maximum Memory":428762726,"Timestamp":1578764671818,"Maximum Onheap Memory":428762726,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-8-openjdk-amd64/jre","Java Version":"1.8.0_232 (Private Build)","Scala Version":"version 2.12.10"},"Spark Properties":{},"Hadoop Properties":{},"System Properties":{}, "Classpath Entries": {}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"application_1578436911597_0052","Timestamp":1578764662851,"User":"tgraves"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":1,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":4,"Discovery Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery Script":"./getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":2,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":2,"Discovery Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":2.0}}}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":3,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":4,"Discovery Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery Script":"./getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":2.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1578764765274,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0," [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"Paral [...]
+{"Event":"SparkListenerExecutorAdded","Timestamp":1578764769706,"Executor ID":"1","Executor Info":{"Host":"host1","Total Cores":4,"Log Urls":{"stdout":"http://host1:8042/node/containerlogs/container_1578436911597_0052_01_000002/tgraves/stdout?start=-4096","stderr":"http://host1:8042/node/containerlogs/container_1578436911597_0052_01_000002/tgraves/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host1:8042","USER":"tgraves","LOG_FILES":"stderr,stdout","NM_HTTP_PORT":"8042","CLUSTER_ [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"host1","Port":40787},"Maximum Memory":384093388,"Timestamp":1578764769796,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1578764769858,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1578764769877,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1578764770507,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1578764770509,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1578764769858,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770512,"Failed":false,"Killed":false,"Accumulables":[{"ID":6,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":2,"Internal":true,"Count Failed  [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1578764769877,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770515,"Failed":false,"Killed":false,"Accumulables":[{"ID":6,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":4,"Internal":true,"Count Failed  [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1578764770525,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1578764770507,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770526,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":11064,"Internal":true,"Count Failed Values [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1578764770527,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1578764770509,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770529,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3620,"Value":14684,"Internal":true,"Count Failed Values [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1578764770525,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770542,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":18320,"Internal":true,"Count Failed Values [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1578764770527,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770542,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":21956,"Internal":true,"Count Failed Values [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"Paral [...]
+{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1578764770546,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1578764802615}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 807f0eb..8037f4a 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1442,7 +1442,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
       conf: SparkConf,
       clock: Clock = new SystemClock()): ExecutorAllocationManager = {
     ResourceProfile.reInitDefaultProfile(conf)
-    rpManager = new ResourceProfileManager(conf)
+    rpManager = new ResourceProfileManager(conf, listenerBus)
     val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock,
       resourceProfileManager = rpManager)
     managers += manager
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index c55b29b..8c2dfff 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -171,6 +171,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors",
     "executor memory usage" -> "applications/app-20161116163331-0000/executors",
     "executor resource information" -> "applications/application_1555004656427_0144/executors",
+    "multiple resource profiles" -> "applications/application_1578436911597_0052/environment",
 
     "app environment" -> "applications/app-20161116163331-0000/environment",
 
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index 004618a..f452173 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.resource
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests._
+import org.apache.spark.scheduler.LiveListenerBus
 
 class ResourceProfileManagerSuite extends SparkFunSuite {
 
@@ -39,9 +40,11 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     }
   }
 
+  val listenerBus = new LiveListenerBus(new SparkConf())
+
   test("ResourceProfileManager") {
     val conf = new SparkConf().set(EXECUTOR_CORES, 4)
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     val defaultProf = rpmanager.defaultResourceProfile
     assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assert(defaultProf.executorResources.size === 2,
@@ -53,7 +56,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
   test("isSupported yarn no dynamic allocation") {
     val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     // default profile should always work
     val defaultProf = rpmanager.defaultResourceProfile
     val rprof = new ResourceProfileBuilder()
@@ -71,7 +74,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
     conf.set(DYN_ALLOCATION_ENABLED, true)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     // default profile should always work
     val defaultProf = rpmanager.defaultResourceProfile
     val rprof = new ResourceProfileBuilder()
@@ -84,7 +87,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
   test("isSupported yarn with local mode") {
     val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     // default profile should always work
     val defaultProf = rpmanager.defaultResourceProfile
     val rprof = new ResourceProfileBuilder()
@@ -100,7 +103,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
 
   test("ResourceProfileManager has equivalent profile") {
     val conf = new SparkConf().set(EXECUTOR_CORES, 4)
-    val rpmanager = new ResourceProfileManager(conf)
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
     var rpAlreadyExist: Option[ResourceProfile] = None
     val checkId = 500
     for (i <- 1 to 1000) {
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 7711934..5d34a56 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -92,7 +92,8 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
         accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
         tasks = None,
         executorSummary = None,
-        killedTasksSummary = Map.empty
+        killedTasksSummary = Map.empty,
+        ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
       )
       val taskTable = new TaskPagedTable(
         stageData,
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index eb7f307..bc7f8b5 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceUtils}
+import org.apache.spark.resource._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.shuffle.MetadataFetchFailedException
@@ -92,7 +92,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
     val applicationEnd = SparkListenerApplicationEnd(42L)
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
-      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, resources.toMap))
+      new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, resources.toMap, 4))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
     val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
     val executorUnblacklisted =
@@ -119,6 +119,14 @@ class JsonProtocolSuite extends SparkFunSuite {
       SparkListenerStageExecutorMetrics("1", 2, 3,
         new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L,
           321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L, 10L, 90L, 2L, 20L)))
+    val rprofBuilder = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+    val execReq =
+      new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
+    rprofBuilder.require(taskReq).require(execReq)
+    val resourceProfile = rprofBuilder.build
+    resourceProfile.setResourceProfileId(21)
+    val resourceProfileAdded = SparkListenerResourceProfileAdded(resourceProfile)
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
     testEvent(taskStart, taskStartJsonString)
@@ -144,6 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
     testEvent(blockUpdated, blockUpdatedJsonString)
     testEvent(stageExecutorMetrics, stageExecutorMetricsJsonString)
+    testEvent(resourceProfileAdded, resourceProfileJsonString)
   }
 
   test("Dependent Classes") {
@@ -231,6 +240,20 @@ class JsonProtocolSuite extends SparkFunSuite {
     assert(0 === newInfo.accumulables.size)
   }
 
+  test("StageInfo resourceProfileId") {
+    val info = makeStageInfo(1, 2, 3, 4L, 5L, 5)
+    val json = JsonProtocol.stageInfoToJson(info)
+
+    // Fields added after 1.0.0.
+    assert(info.details.nonEmpty)
+    assert(info.resourceProfileId === 5)
+
+    val newInfo = JsonProtocol.stageInfoFromJson(json)
+
+    assert(info.name === newInfo.name)
+    assert(5 === newInfo.resourceProfileId)
+  }
+
   test("InputMetrics backward compatibility") {
     // InputMetrics were added after 1.0.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false)
@@ -865,6 +888,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
     assert(ste1.getFileName === ste2.getFileName)
   }
 
+  private def assertEquals(rp1: ResourceProfile, rp2: ResourceProfile): Unit = {
+    assert(rp1 === rp2)
+  }
+
   /** ----------------------------------- *
    | Util methods for constructing events |
    * ------------------------------------ */
@@ -895,10 +922,16 @@ private[spark] object JsonProtocolSuite extends Assertions {
     r
   }
 
-  private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
+  private def makeStageInfo(
+      a: Int,
+      b: Int,
+      c: Int,
+      d: Long,
+      e: Long,
+      rpId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) = {
     val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
     val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details",
-      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      resourceProfileId = rpId)
     val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
     stageInfo.accumulables(acc1.id) = acc1
     stageInfo.accumulables(acc2.id) = acc2
@@ -1034,7 +1067,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |        "Internal": false,
       |        "Count Failed Values": false
       |      }
-      |    ]
+      |    ],
+      |    "Resource Profile Id" : 0
       |  },
       |  "Properties": {
       |    "France": "Paris",
@@ -1091,7 +1125,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |        "Internal": false,
       |        "Count Failed Values": false
       |      }
-      |    ]
+      |    ],
+      |    "Resource Profile Id" : 0
       |  }
       |}
     """.stripMargin
@@ -1613,7 +1648,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    },
       |    {
       |      "Stage ID": 2,
@@ -1673,7 +1709,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    },
       |    {
       |      "Stage ID": 3,
@@ -1749,7 +1786,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    },
       |    {
       |      "Stage ID": 4,
@@ -1841,7 +1879,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |          "Internal": false,
       |          "Count Failed Values": false
       |        }
-      |      ]
+      |      ],
+      |      "Resource Profile Id" : 0
       |    }
       |  ],
       |  "Stage IDs": [
@@ -1988,7 +2027,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |        "name" : "gpu",
       |        "addresses" : [ "0", "1" ]
       |      }
-      |    }
+      |    },
+      |    "Resource Profile Id": 4
       |  }
       |}
     """.stripMargin
@@ -2334,6 +2374,38 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |  "hostId" : "node1"
       |}
     """.stripMargin
+  private val resourceProfileJsonString =
+    """
+      |{
+      |  "Event":"SparkListenerResourceProfileAdded",
+      |  "Resource Profile Id":21,
+      |  "Executor Resource Requests":{
+      |    "cores" : {
+      |      "Resource Name":"cores",
+      |      "Amount":2,
+      |      "Discovery Script":"",
+      |      "Vendor":""
+      |    },
+      |    "gpu":{
+      |      "Resource Name":"gpu",
+      |      "Amount":2,
+      |      "Discovery Script":"myscript",
+      |      "Vendor":""
+      |    }
+      |  },
+      |  "Task Resource Requests":{
+      |    "cpus":{
+      |      "Resource Name":"cpus",
+      |      "Amount":1.0
+      |    },
+      |    "gpu":{
+      |      "Resource Name":"gpu",
+      |      "Amount":1.0
+      |    }
+      |  }
+      |}
+    """.stripMargin
+
 }
 
 case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index f997c9b..4faaaec 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -124,3 +124,4 @@ vote.tmpl
 SessionManager.java
 SessionHandler.java
 GangliaReporter.java
+application_1578436911597_0052
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 8c683e8..894e1e4 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.resource.ResourceProfileManager
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
@@ -87,7 +87,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
   private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
 
-  private val resourceProfileManager = new ResourceProfileManager(sparkConf)
+  private val listenerBus = new LiveListenerBus(new SparkConf())
+  private val resourceProfileManager = new ResourceProfileManager(sparkConf, listenerBus)
 
   before {
     MockitoAnnotations.initMocks(this)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org