You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2022/03/16 15:55:18 UTC

[spark] branch master updated: [SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 71e2110  [SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable
71e2110 is described below

commit 71e2110b799220adc107c9ac5ce737281f2b65cc
Author: Adam Binford <ad...@gmail.com>
AuthorDate: Wed Mar 16 10:54:18 2022 -0500

    [SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable
    
    ### What changes were proposed in this pull request?
    
    Add a new config to set the memory overhead factor for drivers and executors. Currently the memory overhead is hard coded to 10% (except in Kubernetes), and the only way to set it higher is to set it to a specific memory amount.
    
    ### Why are the changes needed?
    
    In dynamic environments where different people or use cases need different memory requirements, it would be helpful to set a higher memory overhead factor instead of having to set a higher specific memory overhead value. The kubernetes resource manager already makes this configurable. This makes it configurable across the board.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No change to default behavior, just adds a new config users can change.
    
    ### How was this patch tested?
    
    New UT to check the memory calculation.
    
    Closes #35504 from Kimahriman/yarn-configurable-memory-overhead-factor.
    
    Authored-by: Adam Binford <ad...@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |  4 +-
 .../org/apache/spark/internal/config/package.scala | 28 ++++++++++
 docs/configuration.md                              | 30 ++++++++++-
 docs/running-on-kubernetes.md                      |  9 ----
 .../k8s/features/BasicDriverFeatureStep.scala      | 13 +++--
 .../k8s/features/BasicExecutorFeatureStep.scala    |  7 ++-
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 63 ++++++++++++++++++++--
 .../features/BasicExecutorFeatureStepSuite.scala   | 54 +++++++++++++++++++
 .../spark/deploy/rest/mesos/MesosRestServer.scala  |  5 +-
 .../cluster/mesos/MesosSchedulerUtils.scala        |  9 ++--
 .../deploy/rest/mesos/MesosRestServerSuite.scala   |  8 ++-
 .../org/apache/spark/deploy/yarn/Client.scala      | 14 +++--
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  5 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    |  5 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 29 ++++++++++
 15 files changed, 248 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 5f37a1a..cf12174 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
       DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0",
         "Please use spark.excludeOnFailure.killExcludedExecutors"),
       DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
-        "Please use spark.yarn.executor.launch.excludeOnFailure.enabled")
+        "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
+      DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0",
+        "Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index dbec61a..ffe4501 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -105,6 +105,22 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR =
+    ConfigBuilder("spark.driver.memoryOverheadFactor")
+      .doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " +
+        "process in cluster mode. This is memory that accounts for things like VM overheads, " +
+        "interned strings, other native overheads, etc. This tends to grow with the container " +
+        "size. This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to " +
+        "0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks " +
+        "commonly fail with \"Memory Overhead Exceeded\" errors. This preempts this error " +
+        "with a higher default. This value is ignored if spark.driver.memoryOverhead is set " +
+        "directly.")
+      .version("3.3.0")
+      .doubleConf
+      .checkValue(factor => factor > 0,
+        "Ensure that memory overhead is a double greater than 0")
+      .createWithDefault(0.1)
+
   private[spark] val DRIVER_LOG_DFS_DIR =
     ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional
 
@@ -315,6 +331,18 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
+    ConfigBuilder("spark.executor.memoryOverheadFactor")
+      .doc("Fraction of executor memory to be allocated as additional non-heap memory per " +
+        "executor process. This is memory that accounts for things like VM overheads, " +
+        "interned strings, other native overheads, etc. This tends to grow with the container " +
+        "size. This value is ignored if spark.executor.memoryOverhead is set directly.")
+      .version("3.3.0")
+      .doubleConf
+      .checkValue(factor => factor > 0,
+        "Ensure that memory overhead is a double greater than 0")
+      .createWithDefault(0.1)
+
   private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
     .doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " +
       "sharing mode, the maximum amount of CPU cores to request for the application from across " +
diff --git a/docs/configuration.md b/docs/configuration.md
index ae3f422..a2e6797 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -183,7 +183,7 @@ of the most common options to set are:
 </tr>
 <tr>
   <td><code>spark.driver.memoryOverhead</code></td>
-  <td>driverMemory * 0.10, with minimum of 384 </td>
+  <td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of 384 </td>
   <td>
     Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless
     otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
@@ -199,6 +199,21 @@ of the most common options to set are:
   <td>2.3.0</td>
 </tr>
 <tr>
+  <td><code>spark.driver.memoryOverheadFactor</code></td>
+  <td>0.10</td>
+  <td>
+    Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode.
+    This is memory that accounts for things like VM overheads, interned strings,
+    other native overheads, etc. This tends to grow with the container size.
+    This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to
+    0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks
+    commonly fail with "Memory Overhead Exceeded" errors. This preempts this error
+    with a higher default.
+    This value is ignored if <code>spark.driver.memoryOverhead</code> is set directly.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
  <td><code>spark.driver.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
@@ -272,7 +287,7 @@ of the most common options to set are:
 </tr>
 <tr>
  <td><code>spark.executor.memoryOverhead</code></td>
-  <td>executorMemory * 0.10, with minimum of 384 </td>
+  <td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of 384 </td>
   <td>
     Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified.
     This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
@@ -288,6 +303,17 @@ of the most common options to set are:
   <td>2.3.0</td>
 </tr>
 <tr>
+  <td><code>spark.executor.memoryOverheadFactor</code></td>
+  <td>0.10</td>
+  <td>
+    Fraction of executor memory to be allocated as additional non-heap memory per executor process.
+    This is memory that accounts for things like VM overheads, interned strings,
+    other native overheads, etc. This tends to grow with the container size.
+    This value is ignored if <code>spark.executor.memoryOverhead</code> is set directly.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
  <td><code>spark.executor.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index a5da80a..de37e22 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1138,15 +1138,6 @@ See the [configuration page](configuration.html) for information on Spark config
   <td>3.0.0</td>
 </tr>
 <tr>
-  <td><code>spark.kubernetes.memoryOverheadFactor</code></td>
-  <td><code>0.1</code></td>
-  <td>
-    This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and <code>tmpfs</code>-based local directories when <code>spark.kubernetes.local.dirs.tmpfs</code> is <code>true</code>. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.
-    This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default.
-  </td>
-  <td>2.4.0</td>
-</tr>
-<tr>
   <td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
   <td><code>"3"</code></td>
   <td>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 3b2b561..9715149 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -53,18 +53,23 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
 
   // Memory settings
   private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
+  private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
+    conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
+  } else {
+    conf.get(MEMORY_OVERHEAD_FACTOR)
+  }
 
   // The memory overhead factor to use. If the user has not set it, then use a different
   // value for non-JVM apps. This value is propagated to executors.
   private val overheadFactor =
     if (conf.mainAppResource.isInstanceOf[NonJVMResource]) {
-      if (conf.contains(MEMORY_OVERHEAD_FACTOR)) {
-        conf.get(MEMORY_OVERHEAD_FACTOR)
+      if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
+        memoryOverheadFactor
       } else {
         NON_JVM_MEMORY_OVERHEAD_FACTOR
       }
     } else {
-      conf.get(MEMORY_OVERHEAD_FACTOR)
+      memoryOverheadFactor
     }
 
   private val memoryOverheadMiB = conf
@@ -164,7 +169,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
       KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
       "spark.app.id" -> conf.appId,
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
-      MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
+      DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
     // try upload local, resolvable files to a hadoop compatible file system
     Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
       val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index a762519..15c69ad 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -59,11 +59,16 @@ private[spark] class BasicExecutorFeatureStep(
   private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
   private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
   private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
+  private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) {
+    kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
+  } else {
+    kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)
+  }
 
   val execResources = ResourceProfile.getResourcesForClusterManager(
     resourceProfile.id,
     resourceProfile.executorResources,
-    kubernetesConf.get(MEMORY_OVERHEAD_FACTOR),
+    memoryOverheadFactor,
     kubernetesConf.sparkConf,
     isPythonApp,
     Map.empty)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index bf7fbcc..d45f5f9 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -134,7 +134,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
       "spark.app.id" -> KubernetesTestConf.APP_ID,
       "spark.kubernetes.submitInDriver" -> "true",
-      MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
+      DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
   }
 
@@ -193,7 +193,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   // Memory overhead tests. Tuples are:
   //   test name, main resource, overhead factor, expected factor
   Seq(
-    ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get),
+    ("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get),
     ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR),
     ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
     ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
@@ -201,13 +201,13 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     test(s"memory overhead factor: $name") {
       // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
       val driverMem =
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
       // main app resource, overhead factor
       val sparkConf = new SparkConf(false)
         .set(CONTAINER_IMAGE, "spark-driver:latest")
         .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
-      factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
+      factor.foreach { value => sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, value) }
       val conf = KubernetesTestConf.createDriverConf(
         sparkConf = sparkConf,
         mainAppResource = resource)
@@ -218,10 +218,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       assert(mem === s"${expected}Mi")
 
       val systemProperties = step.getAdditionalPodSystemProperties()
-      assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
+      assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
     }
   }
 
+  test(s"SPARK-38194: memory overhead factor precendence") {
+    // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
+    val driverMem =
+      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+
+    // main app resource, overhead factor
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
+
+    // New config should take precedence
+    val expectedFactor = 0.2
+    sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor)
+    sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)
+
+    val conf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf)
+    val step = new BasicDriverFeatureStep(conf)
+    val pod = step.configurePod(SparkPod.initialPod())
+    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
+    val expected = (driverMem + driverMem * expectedFactor).toInt
+    assert(mem === s"${expected}Mi")
+
+    val systemProperties = step.getAdditionalPodSystemProperties()
+    assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
+  }
+
+  test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
+    // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
+    val driverMem =
+      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+
+    // main app resource, overhead factor
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
+
+    // Old config still works if new config isn't given
+    val expectedFactor = 0.3
+    sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)
+
+    val conf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf)
+    val step = new BasicDriverFeatureStep(conf)
+    val pod = step.configurePod(SparkPod.initialPod())
+    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
+    val expected = (driverMem + driverMem * expectedFactor).toInt
+    assert(mem === s"${expected}Mi")
+
+    val systemProperties = step.getAdditionalPodSystemProperties()
+    assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
+  }
+
   test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") {
     val initPod = SparkPod.initialPod()
     val sparkConf = new SparkConf()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index f5f2712..731a9b7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -441,6 +441,60 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     ))
   }
 
+  test(s"SPARK-38194: memory overhead factor precendence") {
+    // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
+    val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
+    val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2
+
+    // main app resource, overhead factor
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")
+
+    // New config should take precedence
+    val expectedFactor = 0.2
+    sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, expectedFactor)
+    sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)
+
+    val conf = KubernetesTestConf.createExecutorConf(
+      sparkConf = sparkConf)
+    ResourceProfile.clearDefaultProfile()
+    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
+      resourceProfile)
+    val pod = step.configurePod(SparkPod.initialPod())
+    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
+    val expected = (executorMem + executorMem * expectedFactor).toInt
+    assert(mem === s"${expected}Mi")
+  }
+
+  test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
+    // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
+    val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
+    val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2
+
+    // main app resource, overhead factor
+    val sparkConf = new SparkConf(false)
+      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")
+
+    // New config should take precedence
+    val expectedFactor = 0.3
+    sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)
+
+    val conf = KubernetesTestConf.createExecutorConf(
+      sparkConf = sparkConf)
+    ResourceProfile.clearDefaultProfile()
+    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+    val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
+      resourceProfile)
+    val pod = step.configurePod(SparkPod.initialPod())
+    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
+    val expected = (executorMem + executorMem * expectedFactor).toInt
+    assert(mem === s"${expected}Mi")
+  }
+
+
   // There is always exactly one controller reference, and it points to the driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 2fd13a5..9e41878 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -105,6 +105,7 @@ private[mesos] class MesosSubmitRequestServlet(
     val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
     val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
     val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key)
+    val driverMemoryOverheadFactor = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key)
     val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
     val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
 
@@ -121,8 +122,10 @@ private[mesos] class MesosSubmitRequestServlet(
       mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
     val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
     val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
+    val actualDriverMemoryFactor = driverMemoryOverheadFactor.map(_.toDouble).getOrElse(
+      MEMORY_OVERHEAD_FACTOR)
     val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse(
-      math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
+      math.max((actualDriverMemoryFactor * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
     val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
     val submitDate = new Date()
     val submissionId = newDriverId(submitDate)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 38f83df..524b1d5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -387,8 +387,7 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  // These defaults copied from YARN
-  private val MEMORY_OVERHEAD_FRACTION = 0.10
+  // This default copied from YARN
   private val MEMORY_OVERHEAD_MINIMUM = 384
 
   /**
@@ -400,8 +399,9 @@ trait MesosSchedulerUtils extends Logging {
    *         (whichever is larger)
    */
   def executorMemory(sc: SparkContext): Int = {
+    val memoryOverheadFactor = sc.conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
     sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-      math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+      math.max(memoryOverheadFactor * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
       sc.executorMemory
   }
 
@@ -415,7 +415,8 @@ trait MesosSchedulerUtils extends Logging {
    *         `MEMORY_OVERHEAD_FRACTION (=0.1) * driverMemory`
    */
   def driverContainerMemory(driverDesc: MesosDriverDescription): Int = {
-    val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
+    val memoryOverheadFactor = driverDesc.conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
+    val defaultMem = math.max(memoryOverheadFactor * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
     driverDesc.conf.get(mesosConfig.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) +
       driverDesc.mem
   }
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala
index 344fc38..8bed43a 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala
@@ -35,10 +35,16 @@ class MesosRestServerSuite extends SparkFunSuite
     testOverheadMemory(new SparkConf(), "2000M", 2384)
   }
 
-  test("test driver overhead memory with overhead factor") {
+  test("test driver overhead memory with default overhead factor") {
     testOverheadMemory(new SparkConf(), "5000M", 5500)
   }
 
+  test("test driver overhead memory with overhead factor") {
+    val conf = new SparkConf()
+    conf.set(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key, "0.2")
+    testOverheadMemory(conf, "5000M", 6000)
+  }
+
   test("test configured driver overhead memory") {
     val conf = new SparkConf()
     conf.set(config.DRIVER_MEMORY_OVERHEAD.key, "1000")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index ae85ea8..f364b79 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -54,6 +54,7 @@ import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.ResourceRequestHelper._
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -70,7 +71,6 @@ private[spark] class Client(
   extends Logging {
 
   import Client._
-  import YarnSparkHadoopUtil._
 
   private val yarnClient = YarnClient.createYarnClient
   private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
@@ -85,6 +85,12 @@ private[spark] class Client(
   private var appMaster: ApplicationMaster = _
   private var stagingDirPath: Path = _
 
+  private val amMemoryOverheadFactor = if (isClusterMode) {
+    sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
+  } else {
+    AM_MEMORY_OVERHEAD_FACTOR
+  }
+
   // AM related configurations
   private val amMemory = if (isClusterMode) {
     sparkConf.get(DRIVER_MEMORY).toInt
@@ -94,7 +100,7 @@ private[spark] class Client(
   private val amMemoryOverhead = {
     val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
     sparkConf.get(amMemoryOverheadEntry).getOrElse(
-      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong,
+      math.max((amMemoryOverheadFactor * amMemory).toLong,
         ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
   }
   private val amCores = if (isClusterMode) {
@@ -107,8 +113,10 @@ private[spark] class Client(
   private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
   // Executor offHeap memory in MiB.
   protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)
+
+  private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
   private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong,
+    math.max((executorMemoryOvereadFactor * executorMemory).toLong,
       ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
 
   private val isPython = sparkConf.get(IS_PYTHON_APP)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 54ab643..a85b717 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -163,6 +163,8 @@ private[yarn] class YarnAllocator(
 
   private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
 
+  private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
+
   private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
     "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
 
@@ -280,9 +282,10 @@ private[yarn] class YarnAllocator(
       // track the resource profile if not already there
       getOrUpdateRunningExecutorForRPId(rp.id)
       logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
+
       val resourcesWithDefaults =
         ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources,
-          MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp, resourceNameMapping)
+          memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping)
       val customSparkResources =
         resourcesWithDefaults.customResources.map { case (name, execReq) =>
           (name, execReq.amount.toString)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index f347e37..1869c73 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -34,11 +34,10 @@ import org.apache.spark.util.Utils
 
 object YarnSparkHadoopUtil {
 
-  // Additional memory overhead
+  // Additional memory overhead for application masters in client mode.
   // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
   // the common cases. Memory overhead tends to grow with container size.
-
-  val MEMORY_OVERHEAD_FACTOR = 0.10
+  val AM_MEMORY_OVERHEAD_FACTOR = 0.10
 
   val ANY_HOST = "*"
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index db65d12..ae010f1 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -706,4 +706,33 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
     }
   }
+
+  test("SPARK-38194: Configurable memory overhead factor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toLong
+    try {
+      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemory
+      assert(memory == (executorMemory * 1.5).toLong)
+    } finally {
+      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+    }
+  }
+
+  test("SPARK-38194: Memory overhead takes precedence over factor") {
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+    try {
+      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5)
+      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD, (executorMemory * 0.4).toLong)
+      val (handler, _) = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val memory = defaultResource.getMemory
+      assert(memory == (executorMemory * 1.4).toLong)
+    } finally {
+      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org