You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/04/02 13:30:47 UTC

[spark] branch master updated: [SPARK-29153][CORE] Add ability to merge resource profiles within a stage with Stage Level Scheduling

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 55dea9b  [SPARK-29153][CORE] Add ability to merge resource profiles within a stage with Stage Level Scheduling
55dea9b is described below

commit 55dea9be62019d64d5d76619e1551956c8bb64d0
Author: Thomas Graves <tg...@apache.org>
AuthorDate: Thu Apr 2 08:30:18 2020 -0500

    [SPARK-29153][CORE] Add ability to merge resource profiles within a stage with Stage Level Scheduling
    
    ### What changes were proposed in this pull request?
    
    For the stage level scheduling feature, add the ability to optionally merged resource profiles if they were specified on multiple RDD within a stage.  There is a config to enable this feature, its off by default (spark.scheduler.resourceProfile.mergeConflicts). When the config is set to true, Spark will merge the profiles selecting the max value of each resource (cores, memory, gpu, etc).  further documentation will be added with SPARK-30322.
    
    This also added in the ability to check if an equivalent resource profile already exists. This is so that if a user is running stages and combining the same profiles over and over again we don't get an explosion in the number of profiles.
    
    ### Why are the changes needed?
    
    To allow users to specify resource on multiple RDD and not worry as much about if they go into the same stage and fail.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, when the config is turned on it now merges the profiles instead of errorring out.
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #28053 from tgravescs/SPARK-29153.
    
    Lead-authored-by: Thomas Graves <tg...@apache.org>
    Co-authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala |  10 ++
 .../apache/spark/resource/ResourceProfile.scala    |   5 +
 .../spark/resource/ResourceProfileManager.scala    |  55 ++++++--
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  48 ++++++-
 .../resource/ResourceProfileManagerSuite.scala     |  30 +++++
 .../spark/resource/ResourceProfileSuite.scala      |  20 +++
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 140 ++++++++++++++++++++-
 docs/configuration.md                              |  11 ++
 8 files changed, 304 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 8f8b6ad..76791ab 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1816,4 +1816,14 @@ package object config {
     .bytesConf(ByteUnit.BYTE)
     .createOptional
 
+  private[spark] val RESOURCE_PROFILE_MERGE_CONFLICTS =
+    ConfigBuilder("spark.scheduler.resource.profileMergeConflicts")
+      .doc("If set to true, Spark will merge ResourceProfiles when different profiles " +
+        "are specified in RDDs that get combined into a single stage. When they are merged, " +
+        "Spark chooses the maximum of each resource and creates a new ResourceProfile. The " +
+        "default of false results in Spark throwing an exception if multiple different " +
+        "ResourceProfiles are found in RDDs going into the same stage.")
+      .version("3.1.0")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 96c456e..97186fb 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -222,6 +222,11 @@ class ResourceProfile(
     }
   }
 
+  // check that the task resources and executor resources are equal, but id's could be different
+  private[spark] def resourcesEqual(rp: ResourceProfile): Boolean = {
+    rp.taskResources == taskResources && rp.executorResources == executorResources
+  }
+
   override def hashCode(): Int = Seq(taskResources, executorResources).hashCode()
 
   override def toString(): String = {
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index fabc0bd..c3e2444 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.resource
 
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import scala.collection.mutable.HashMap
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.annotation.Evolving
@@ -34,7 +36,12 @@ import org.apache.spark.util.Utils.isTesting
  */
 @Evolving
 private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
-  private val resourceProfileIdToResourceProfile = new ConcurrentHashMap[Int, ResourceProfile]()
+  private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]()
+
+  private val (readLock, writeLock) = {
+    val lock = new ReentrantReadWriteLock()
+    (lock.readLock(), lock.writeLock())
+  }
 
   private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
   addResourceProfile(defaultProfile)
@@ -61,10 +68,20 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
 
   def addResourceProfile(rp: ResourceProfile): Unit = {
     isSupported(rp)
-    // force the computation of maxTasks and limitingResource now so we don't have cost later
-    rp.limitingResource(sparkConf)
-    val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
-    if (res == null) {
+    var putNewProfile = false
+    writeLock.lock()
+    try {
+      if (!resourceProfileIdToResourceProfile.contains(rp.id)) {
+        val prev = resourceProfileIdToResourceProfile.put(rp.id, rp)
+        if (prev.isEmpty) putNewProfile = true
+      }
+    } finally {
+      writeLock.unlock()
+    }
+    // do this outside the write lock only when we add a new profile
+    if (putNewProfile) {
+      // force the computation of maxTasks and limitingResource now so we don't have cost later
+      rp.limitingResource(sparkConf)
       logInfo(s"Added ResourceProfile id: ${rp.id}")
     }
   }
@@ -74,10 +91,28 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
    * it returns the default ResourceProfile created from the application level configs.
    */
   def resourceProfileFromId(rpId: Int): ResourceProfile = {
-    val rp = resourceProfileIdToResourceProfile.get(rpId)
-    if (rp == null) {
-      throw new SparkException(s"ResourceProfileId $rpId not found!")
+    readLock.lock()
+    try {
+      resourceProfileIdToResourceProfile.get(rpId).getOrElse(
+        throw new SparkException(s"ResourceProfileId $rpId not found!")
+      )
+    } finally {
+      readLock.unlock()
+    }
+  }
+
+  /*
+   * If the ResourceProfile passed in is equivalent to an existing one, return the
+   * existing one, other return None
+   */
+  def getEquivalentProfile(rp: ResourceProfile): Option[ResourceProfile] = {
+    readLock.lock()
+    try {
+      resourceProfileIdToResourceProfile.find { case (_, rpEntry) =>
+        rpEntry.resourcesEqual(rp)
+      }.map(_._2)
+    } finally {
+      readLock.unlock()
     }
-    rp
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 079cf11..c6c39d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -186,6 +186,8 @@ private[spark] class DAGScheduler(
   /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
   private val disallowStageRetryForTest = sc.getConf.get(TEST_NO_STAGE_RETRY)
 
+  private val shouldMergeResourceProfiles = sc.getConf.get(config.RESOURCE_PROFILE_MERGE_CONFLICTS)
+
   /**
    * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure,
    * this is set default to false, which means, we only unregister the outputs related to the exact
@@ -447,10 +449,27 @@ private[spark] class DAGScheduler(
       stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = {
     logDebug(s"Merging stage rdd profiles: $stageResourceProfiles")
     val resourceProfile = if (stageResourceProfiles.size > 1) {
-      // add option later to actually merge profiles - SPARK-29153
-      throw new IllegalArgumentException("Multiple ResourceProfile's specified in the RDDs for " +
-        "this stage, please resolve the conflicting ResourceProfile's as Spark doesn't" +
-        "currently support merging them.")
+      if (shouldMergeResourceProfiles) {
+        val startResourceProfile = stageResourceProfiles.head
+        val mergedProfile = stageResourceProfiles.drop(1)
+          .foldLeft(startResourceProfile)((a, b) => mergeResourceProfiles(a, b))
+        // compared merged profile with existing ones so we don't add it over and over again
+        // if the user runs the same operation multiple times
+        val resProfile = sc.resourceProfileManager.getEquivalentProfile(mergedProfile)
+        resProfile match {
+          case Some(existingRp) => existingRp
+          case None =>
+            // this ResourceProfile could be different if it was merged so we have to add it to
+            // our ResourceProfileManager
+            sc.resourceProfileManager.addResourceProfile(mergedProfile)
+            mergedProfile
+        }
+      } else {
+        throw new IllegalArgumentException("Multiple ResourceProfiles specified in the RDDs for " +
+          "this stage, either resolve the conflicting ResourceProfiles yourself or enable " +
+          s"${config.RESOURCE_PROFILE_MERGE_CONFLICTS.key} and understand how Spark handles " +
+          "the merging them.")
+      }
     } else {
       if (stageResourceProfiles.size == 1) {
         stageResourceProfiles.head
@@ -461,6 +480,27 @@ private[spark] class DAGScheduler(
     resourceProfile
   }
 
+  // This is a basic function to merge resource profiles that takes the max
+  // value of the profiles. We may want to make this more complex in the future as
+  // you may want to sum some resources (like memory).
+  private[scheduler] def mergeResourceProfiles(
+      r1: ResourceProfile,
+      r2: ResourceProfile): ResourceProfile = {
+    val mergedExecKeys = r1.executorResources ++ r2.executorResources
+    val mergedExecReq = mergedExecKeys.map { case (k, v) =>
+        val larger = r1.executorResources.get(k).map( x =>
+          if (x.amount > v.amount) x else v).getOrElse(v)
+        k -> larger
+    }
+    val mergedTaskKeys = r1.taskResources ++ r2.taskResources
+    val mergedTaskReq = mergedTaskKeys.map { case (k, v) =>
+      val larger = r1.taskResources.get(k).map( x =>
+        if (x.amount > v.amount) x else v).getOrElse(v)
+      k -> larger
+    }
+    new ResourceProfile(mergedExecReq, mergedTaskReq)
+  }
+
   /**
    * Create a ResultStage associated with the provided jobId.
    */
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index 0752603..004618a 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -98,6 +98,36 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     assert(error.contains("ResourceProfiles are only supported on YARN with dynamic allocation"))
   }
 
+  test("ResourceProfileManager has equivalent profile") {
+    val conf = new SparkConf().set(EXECUTOR_CORES, 4)
+    val rpmanager = new ResourceProfileManager(conf)
+    var rpAlreadyExist: Option[ResourceProfile] = None
+    val checkId = 500
+    for (i <- 1 to 1000) {
+      val rprofBuilder = new ResourceProfileBuilder()
+      val ereqs = new ExecutorResourceRequests()
+      ereqs.cores(i).memory("4g").memoryOverhead("2000m")
+      val treqs = new TaskResourceRequests()
+      treqs.cpus(i)
+      rprofBuilder.require(ereqs).require(treqs)
+      val rprof = rprofBuilder.build
+      rpmanager.addResourceProfile(rprof)
+      if (i == checkId) rpAlreadyExist = Some(rprof)
+    }
+    val rpNotMatch = new ResourceProfileBuilder().build
+    assert(rpmanager.getEquivalentProfile(rpNotMatch).isEmpty,
+      s"resourceProfile should not have existed")
 
+    val rprofBuilder = new ResourceProfileBuilder()
+    val ereqs = new ExecutorResourceRequests()
+    ereqs.cores(checkId).memory("4g").memoryOverhead("2000m")
+    val treqs = new TaskResourceRequests()
+    treqs.cpus(checkId)
+    rprofBuilder.require(ereqs).require(treqs)
+    val rpShouldMatch = rprofBuilder.build
 
+    val equivProf = rpmanager.getEquivalentProfile(rpShouldMatch)
+    assert(equivProf.nonEmpty)
+    assert(equivProf.get.id == rpAlreadyExist.get.id, s"resourceProfile should have existed")
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index b2f2c36..29d3ef1 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -193,6 +193,26 @@ class ResourceProfileSuite extends SparkFunSuite {
     assert(rprof.taskResources("cpus").amount === 1, "Task resources should have cpu")
   }
 
+  test("test ResourceProfiles equal") {
+    val rprofBuilder = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val eReq = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
+    rprofBuilder.require(taskReq).require(eReq)
+    val rprof = rprofBuilder.build
+
+    val rprofBuilder2 = new ResourceProfileBuilder()
+    val taskReq2 = new TaskResourceRequests().resource("gpu", 1)
+    val eReq2 = new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
+    rprofBuilder2.require(taskReq2).require(eReq2)
+    val rprof2 = rprofBuilder.build
+    rprof2.setResourceProfileId(rprof.id)
+
+    assert(rprof === rprof2, "resource profile equality not working")
+    rprof2.setResourceProfileId(rprof.id + 1)
+    assert(rprof.id != rprof2.id, "resource profiles should not have same id")
+    assert(rprof.resourcesEqual(rprof2), "resource profile resourcesEqual not working")
+  }
+
   test("Test ExecutorResourceRequests memory helpers") {
     val rprof = new ResourceProfileBuilder()
     val ereqs = new ExecutorResourceRequests()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 33a14ce..4c6033e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.rdd.{DeterministicLevel, RDD}
 import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests}
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@@ -3184,7 +3185,144 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       scheduler.mergeResourceProfilesForStage(resourceprofiles)
     }.getMessage()
 
-    assert(error.contains("Multiple ResourceProfile's specified in the RDDs"))
+    assert(error.contains("Multiple ResourceProfiles specified in the RDDs"))
+  }
+
+  test("test 2 resource profile with merge conflict config true") {
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
+    init(conf)
+
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(2)
+    val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+    val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
+    val (shuffledeps, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val mergedRp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    assert(mergedRp.getTaskCpus.get == 2)
+    assert(mergedRp.getExecutorCores.get == 4)
+  }
+
+  test("test multiple resource profiles created from merging use same rp") {
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
+    init(conf)
+
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(2)
+    val rp2 = new ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+    val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
+    val (_, resourceprofiles) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val mergedRp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    assert(mergedRp.getTaskCpus.get == 2)
+    assert(mergedRp.getExecutorCores.get == 4)
+
+    // test that instead of creating a new merged profile, we use the already created one
+    val rdd2 = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, x)).withResources(rp2)
+    val (_, resourceprofiles2) = scheduler.getShuffleDependenciesAndResourceProfiles(rdd2)
+    val mergedRp2 = scheduler.mergeResourceProfilesForStage(resourceprofiles2)
+    assert(mergedRp2.id === mergedRp.id)
+    assert(mergedRp2.getTaskCpus.get == 2)
+    assert(mergedRp2.getExecutorCores.get == 4)
+  }
+
+  test("test merge 2 resource profiles multiple configs") {
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(2)
+    val rp1 = new ResourceProfile(ereqs.requests, treqs.requests)
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(1)
+    val rp2 = new ResourceProfile(ereqs2.requests, treqs2.requests)
+    var mergedRp = scheduler.mergeResourceProfiles(rp1, rp2)
+
+    assert(mergedRp.getTaskCpus.get == 2)
+    assert(mergedRp.getExecutorCores.get == 4)
+
+    val ereqs3 = new ExecutorResourceRequests().cores(1).resource(GPU, 1, "disc")
+    val treqs3 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+    val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests)
+    val ereqs4 = new ExecutorResourceRequests().cores(2)
+    val treqs4 = new TaskResourceRequests().cpus(2)
+    val rp4 = new ResourceProfile(ereqs4.requests, treqs4.requests)
+    mergedRp = scheduler.mergeResourceProfiles(rp3, rp4)
+
+    assert(mergedRp.getTaskCpus.get == 2)
+    assert(mergedRp.getExecutorCores.get == 2)
+    assert(mergedRp.executorResources.size == 2)
+    assert(mergedRp.taskResources.size == 2)
+    assert(mergedRp.executorResources.get(GPU).get.amount == 1)
+    assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
+    assert(mergedRp.taskResources.get(GPU).get.amount == 1)
+
+    val ereqs5 = new ExecutorResourceRequests().cores(1).memory("3g")
+      .memoryOverhead("1g").pysparkMemory("2g").resource(GPU, 1, "disc")
+    val treqs5 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+    val rp5 = new ResourceProfile(ereqs5.requests, treqs5.requests)
+    val ereqs6 = new ExecutorResourceRequests().cores(8).resource(FPGA, 2, "fdisc")
+    val treqs6 = new TaskResourceRequests().cpus(2).resource(FPGA, 1)
+    val rp6 = new ResourceProfile(ereqs6.requests, treqs6.requests)
+    mergedRp = scheduler.mergeResourceProfiles(rp5, rp6)
+
+    assert(mergedRp.getTaskCpus.get == 2)
+    assert(mergedRp.getExecutorCores.get == 8)
+    assert(mergedRp.executorResources.size == 6)
+    assert(mergedRp.taskResources.size == 3)
+    assert(mergedRp.executorResources.get(GPU).get.amount == 1)
+    assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
+    assert(mergedRp.taskResources.get(GPU).get.amount == 1)
+    assert(mergedRp.executorResources.get(FPGA).get.amount == 2)
+    assert(mergedRp.executorResources.get(FPGA).get.discoveryScript == "fdisc")
+    assert(mergedRp.taskResources.get(FPGA).get.amount == 1)
+    assert(mergedRp.executorResources.get(ResourceProfile.MEMORY).get.amount == 3072)
+    assert(mergedRp.executorResources.get(ResourceProfile.PYSPARK_MEM).get.amount == 2048)
+    assert(mergedRp.executorResources.get(ResourceProfile.OVERHEAD_MEM).get.amount == 1024)
+
+    val ereqs7 = new ExecutorResourceRequests().cores(1).memory("3g")
+      .resource(GPU, 4, "disc")
+    val treqs7 = new TaskResourceRequests().cpus(1).resource(GPU, 1)
+    val rp7 = new ResourceProfile(ereqs7.requests, treqs7.requests)
+    val ereqs8 = new ExecutorResourceRequests().cores(1).resource(GPU, 2, "fdisc")
+    val treqs8 = new TaskResourceRequests().cpus(1).resource(GPU, 2)
+    val rp8 = new ResourceProfile(ereqs8.requests, treqs8.requests)
+    mergedRp = scheduler.mergeResourceProfiles(rp7, rp8)
+
+    assert(mergedRp.getTaskCpus.get == 1)
+    assert(mergedRp.getExecutorCores.get == 1)
+    assert(mergedRp.executorResources.get(GPU).get.amount == 4)
+    assert(mergedRp.executorResources.get(GPU).get.discoveryScript == "disc")
+    assert(mergedRp.taskResources.get(GPU).get.amount == 2)
+  }
+
+  test("test merge 3 resource profiles") {
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
+    init(conf)
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfile(ereqs.requests, treqs.requests)
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(1)
+    val rp2 = new ResourceProfile(ereqs2.requests, treqs2.requests)
+    val ereqs3 = new ExecutorResourceRequests().cores(3)
+    val treqs3 = new TaskResourceRequests().cpus(2)
+    val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests)
+    var mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))
+
+    assert(mergedRp.getTaskCpus.get == 2)
+    assert(mergedRp.getExecutorCores.get == 4)
   }
 
   /**
diff --git a/docs/configuration.md b/docs/configuration.md
index a8fddbc..5081d79 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2136,6 +2136,17 @@ Apart from these, the following properties are also available, and may be useful
   <td>3.0.0</td>
 </tr>
 <tr>
+  <td><code>spark.scheduler.resource.profileMergeConflicts</code></td>
+  <td>false</td>
+  <td>
+    If set to "true", Spark will merge ResourceProfiles when different profiles are specified
+    in RDDs that get combined into a single stage. When they are merged, Spark chooses the maximum of
+    each resource and creates a new ResourceProfile. The default of false results in Spark throwing
+    an exception if multiple different ResourceProfiles are found in RDDs going into the same stage.
+  </td>
+  <td>3.1.0</td>
+</tr>
+<tr>
   <td><code>spark.scheduler.blacklist.unschedulableTaskSetTimeout</code></td>
   <td>120s</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org