You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/17 19:57:34 UTC

[spark] branch branch-3.3 updated: Revert "[SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new cf0afa8  Revert "[SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable"
cf0afa8 is described below

commit cf0afa8619544ab6008fcec8a25891c2ff43625a
Author: Thomas Graves <tg...@nvidia.com>
AuthorDate: Thu Mar 17 12:54:50 2022 -0700

    Revert "[SPARK-38194][YARN][MESOS][K8S] Make memory overhead factor configurable"
    
    ### What changes were proposed in this pull request?
    
    This reverts commit 8405ec352dbed6a3199fc2af3c60fae7186d15b5.
    
    ### Why are the changes needed?
    
    The original PR broke K8s integration tests so lets revert in branch-3.3 for now and fix on master.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CI. K8s IT is recovered like the following.
    
    ```
    [info] KubernetesSuite:
    [info] - Run SparkPi with no resources (9 seconds, 832 milliseconds)
    [info] - Run SparkPi with no resources & statefulset allocation (9 seconds, 715 milliseconds)
    [info] - Run SparkPi with a very long application name. (8 seconds, 672 milliseconds)
    [info] - Use SparkLauncher.NO_RESOURCE (9 seconds, 614 milliseconds)
    [info] - Run SparkPi with a master URL without a scheme. (9 seconds, 616 milliseconds)
    [info] - Run SparkPi with an argument. (8 seconds, 633 milliseconds)
    [info] - Run SparkPi with custom labels, annotations, and environment variables. (8 seconds, 631 milliseconds)
    [info] - All pods have the same service account by default (8 seconds, 625 milliseconds)
    [info] - Run extraJVMOptions check on driver (4 seconds, 639 milliseconds)
    [info] - Run SparkRemoteFileTest using a remote data file (8 seconds, 699 milliseconds)
    [info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (14 seconds, 31 milliseconds)
    [info] - Run SparkPi with env and mount secrets. (17 seconds, 878 milliseconds)
    [info] - Run PySpark on simple pi.py example (9 seconds, 642 milliseconds)
    [info] - Run PySpark to test a pyfiles example (11 seconds, 883 milliseconds)
    [info] - Run PySpark with memory customization (9 seconds, 602 milliseconds)
    [info] - Run in client mode. (6 seconds, 303 milliseconds)
    [info] - Start pod creation from template (8 seconds, 864 milliseconds)
    [info] - SPARK-38398: Schedule pod creation from template (8 seconds, 665 milliseconds)
    [info] - Test basic decommissioning (41 seconds, 74 milliseconds)
    [info] - Test basic decommissioning with shuffle cleanup (41 seconds, 318 milliseconds)
    [info] - Test decommissioning with dynamic allocation & shuffle cleanups (2 minutes, 40 seconds)
    [info] - Test decommissioning timeouts (41 seconds, 892 milliseconds)
    [info] - SPARK-37576: Rolling decommissioning (1 minute, 7 seconds)
    [info] - Run SparkR on simple dataframe.R example (11 seconds, 643 milliseconds)
    [info] VolcanoSuite:
    [info] - Run SparkPi with no resources (9 seconds, 585 milliseconds)
    [info] - Run SparkPi with no resources & statefulset allocation (10 seconds, 607 milliseconds)
    [info] - Run SparkPi with a very long application name. (9 seconds, 636 milliseconds)
    [info] - Use SparkLauncher.NO_RESOURCE (10 seconds, 681 milliseconds)
    [info] - Run SparkPi with a master URL without a scheme. (10 seconds, 628 milliseconds)
    [info] - Run SparkPi with an argument. (9 seconds, 638 milliseconds)
    [info] - Run SparkPi with custom labels, annotations, and environment variables. (9 seconds, 626 milliseconds)
    [info] - All pods have the same service account by default (10 seconds, 615 milliseconds)
    [info] - Run extraJVMOptions check on driver (4 seconds, 590 milliseconds)
    [info] - Run SparkRemoteFileTest using a remote data file (9 seconds, 660 milliseconds)
    [info] - Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j2.properties (15 seconds, 277 milliseconds)
    [info] - Run SparkPi with env and mount secrets. (19 seconds, 300 milliseconds)
    [info] - Run PySpark on simple pi.py example (10 seconds, 641 milliseconds)
    [info] - Run PySpark to test a pyfiles example (12 seconds, 656 milliseconds)
    [info] - Run PySpark with memory customization (10 seconds, 599 milliseconds)
    [info] - Run in client mode. (7 seconds, 258 milliseconds)
    [info] - Start pod creation from template (10 seconds, 664 milliseconds)
    [info] - SPARK-38398: Schedule pod creation from template (10 seconds, 891 milliseconds)
    [info] - Test basic decommissioning (42 seconds, 85 milliseconds)
    [info] - Test basic decommissioning with shuffle cleanup (42 seconds, 384 milliseconds)
    [info] - Test decommissioning with dynamic allocation & shuffle cleanups (2 minutes, 42 seconds)
    [info] - Test decommissioning timeouts (42 seconds, 725 milliseconds)
    [info] - SPARK-37576: Rolling decommissioning (1 minute, 8 seconds)
    [info] - Run SparkR on simple dataframe.R example (12 seconds, 641 milliseconds)
    [info] - Run SparkPi with volcano scheduler (10 seconds, 652 milliseconds)
    [info] - SPARK-38187: Run SparkPi Jobs with minCPU (27 seconds, 590 milliseconds)
    [info] - SPARK-38187: Run SparkPi Jobs with minMemory (29 seconds, 600 milliseconds)
    [info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (13 seconds, 228 milliseconds)
    [info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (22 seconds, 329 milliseconds)
    [info] - SPARK-38423: Run driver job to validate priority order (15 seconds, 367 milliseconds)
    [info] Run completed in 28 minutes, 52 seconds.
    [info] Total number of tests run: 54
    [info] Suites: completed 2, aborted 0
    [info] Tests: succeeded 54, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 1881 s (31:21), completed Mar 17, 2022 11:55:25 AM
    ```
    
    Closes #35900 from tgravescs/revertoverhead.
    
    Authored-by: Thomas Graves <tg...@nvidia.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |  4 +-
 .../org/apache/spark/internal/config/package.scala | 28 ----------
 docs/configuration.md                              | 30 +----------
 docs/running-on-kubernetes.md                      |  9 ++++
 .../k8s/features/BasicDriverFeatureStep.scala      | 13 ++---
 .../k8s/features/BasicExecutorFeatureStep.scala    |  7 +--
 .../k8s/features/BasicDriverFeatureStepSuite.scala | 63 ++--------------------
 .../features/BasicExecutorFeatureStepSuite.scala   | 54 -------------------
 .../spark/deploy/rest/mesos/MesosRestServer.scala  |  5 +-
 .../cluster/mesos/MesosSchedulerUtils.scala        |  9 ++--
 .../deploy/rest/mesos/MesosRestServerSuite.scala   |  8 +--
 .../org/apache/spark/deploy/yarn/Client.scala      | 14 ++---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  5 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    |  5 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 29 ----------
 15 files changed, 35 insertions(+), 248 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index cf12174..5f37a1a 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -636,9 +636,7 @@ private[spark] object SparkConf extends Logging {
       DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0",
         "Please use spark.excludeOnFailure.killExcludedExecutors"),
       DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
-        "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
-      DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0",
-        "Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor")
+        "Please use spark.yarn.executor.launch.excludeOnFailure.enabled")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ffe4501..dbec61a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -105,22 +105,6 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
-  private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR =
-    ConfigBuilder("spark.driver.memoryOverheadFactor")
-      .doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " +
-        "process in cluster mode. This is memory that accounts for things like VM overheads, " +
-        "interned strings, other native overheads, etc. This tends to grow with the container " +
-        "size. This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to " +
-        "0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks " +
-        "commonly fail with \"Memory Overhead Exceeded\" errors. This preempts this error " +
-        "with a higher default. This value is ignored if spark.driver.memoryOverhead is set " +
-        "directly.")
-      .version("3.3.0")
-      .doubleConf
-      .checkValue(factor => factor > 0,
-        "Ensure that memory overhead is a double greater than 0")
-      .createWithDefault(0.1)
-
   private[spark] val DRIVER_LOG_DFS_DIR =
     ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional
 
@@ -331,18 +315,6 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
-  private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR =
-    ConfigBuilder("spark.executor.memoryOverheadFactor")
-      .doc("Fraction of executor memory to be allocated as additional non-heap memory per " +
-        "executor process. This is memory that accounts for things like VM overheads, " +
-        "interned strings, other native overheads, etc. This tends to grow with the container " +
-        "size. This value is ignored if spark.executor.memoryOverhead is set directly.")
-      .version("3.3.0")
-      .doubleConf
-      .checkValue(factor => factor > 0,
-        "Ensure that memory overhead is a double greater than 0")
-      .createWithDefault(0.1)
-
   private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
     .doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " +
       "sharing mode, the maximum amount of CPU cores to request for the application from across " +
diff --git a/docs/configuration.md b/docs/configuration.md
index a2e6797..ae3f422 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -183,7 +183,7 @@ of the most common options to set are:
 </tr>
 <tr>
   <td><code>spark.driver.memoryOverhead</code></td>
-  <td>driverMemory * <code>spark.driver.memoryOverheadFactor</code>, with minimum of 384 </td>
+  <td>driverMemory * 0.10, with minimum of 384 </td>
   <td>
     Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless
     otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
@@ -199,21 +199,6 @@ of the most common options to set are:
   <td>2.3.0</td>
 </tr>
 <tr>
-  <td><code>spark.driver.memoryOverheadFactor</code></td>
-  <td>0.10</td>
-  <td>
-    Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode.
-    This is memory that accounts for things like VM overheads, interned strings,
-    other native overheads, etc. This tends to grow with the container size.
-    This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to
-    0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks
-    commonly fail with "Memory Overhead Exceeded" errors. This preempts this error
-    with a higher default.
-    This value is ignored if <code>spark.driver.memoryOverhead</code> is set directly.
-  </td>
-  <td>3.3.0</td>
-</tr>
-<tr>
  <td><code>spark.driver.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
@@ -287,7 +272,7 @@ of the most common options to set are:
 </tr>
 <tr>
  <td><code>spark.executor.memoryOverhead</code></td>
-  <td>executorMemory * <code>spark.executor.memoryOverheadFactor</code>, with minimum of 384 </td>
+  <td>executorMemory * 0.10, with minimum of 384 </td>
   <td>
     Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified.
     This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
@@ -303,17 +288,6 @@ of the most common options to set are:
   <td>2.3.0</td>
 </tr>
 <tr>
-  <td><code>spark.executor.memoryOverheadFactor</code></td>
-  <td>0.10</td>
-  <td>
-    Fraction of executor memory to be allocated as additional non-heap memory per executor process.
-    This is memory that accounts for things like VM overheads, interned strings,
-    other native overheads, etc. This tends to grow with the container size.
-    This value is ignored if <code>spark.executor.memoryOverhead</code> is set directly.
-  </td>
-  <td>3.3.0</td>
-</tr>
-<tr>
  <td><code>spark.executor.resource.{resourceName}.amount</code></td>
   <td>0</td>
   <td>
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index de37e22..a5da80a 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1138,6 +1138,15 @@ See the [configuration page](configuration.html) for information on Spark config
   <td>3.0.0</td>
 </tr>
 <tr>
+  <td><code>spark.kubernetes.memoryOverheadFactor</code></td>
+  <td><code>0.1</code></td>
+  <td>
+    This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and <code>tmpfs</code>-based local directories when <code>spark.kubernetes.local.dirs.tmpfs</code> is <code>true</code>. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs.
+    This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default.
+  </td>
+  <td>2.4.0</td>
+</tr>
+<tr>
   <td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
   <td><code>"3"</code></td>
   <td>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 9715149..3b2b561 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -53,23 +53,18 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
 
   // Memory settings
   private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
-  private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
-    conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
-  } else {
-    conf.get(MEMORY_OVERHEAD_FACTOR)
-  }
 
   // The memory overhead factor to use. If the user has not set it, then use a different
   // value for non-JVM apps. This value is propagated to executors.
   private val overheadFactor =
     if (conf.mainAppResource.isInstanceOf[NonJVMResource]) {
-      if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) {
-        memoryOverheadFactor
+      if (conf.contains(MEMORY_OVERHEAD_FACTOR)) {
+        conf.get(MEMORY_OVERHEAD_FACTOR)
       } else {
         NON_JVM_MEMORY_OVERHEAD_FACTOR
       }
     } else {
-      memoryOverheadFactor
+      conf.get(MEMORY_OVERHEAD_FACTOR)
     }
 
   private val memoryOverheadMiB = conf
@@ -169,7 +164,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
       KUBERNETES_DRIVER_POD_NAME.key -> driverPodName,
       "spark.app.id" -> conf.appId,
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
-      DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
+      MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
     // try upload local, resolvable files to a hadoop compatible file system
     Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
       val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 15c69ad..a762519 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -59,16 +59,11 @@ private[spark] class BasicExecutorFeatureStep(
   private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
   private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)
   private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)
-  private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) {
-    kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
-  } else {
-    kubernetesConf.get(MEMORY_OVERHEAD_FACTOR)
-  }
 
   val execResources = ResourceProfile.getResourcesForClusterManager(
     resourceProfile.id,
     resourceProfile.executorResources,
-    memoryOverheadFactor,
+    kubernetesConf.get(MEMORY_OVERHEAD_FACTOR),
     kubernetesConf.sparkConf,
     isPythonApp,
     Map.empty)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index d45f5f9..bf7fbcc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -134,7 +134,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
       "spark.app.id" -> KubernetesTestConf.APP_ID,
       "spark.kubernetes.submitInDriver" -> "true",
-      DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
+      MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf)
   }
 
@@ -193,7 +193,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   // Memory overhead tests. Tuples are:
   //   test name, main resource, overhead factor, expected factor
   Seq(
-    ("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get),
+    ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get),
     ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR),
     ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d),
     ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR)
@@ -201,13 +201,13 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     test(s"memory overhead factor: $name") {
       // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
       val driverMem =
-        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
+        ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
 
       // main app resource, overhead factor
       val sparkConf = new SparkConf(false)
         .set(CONTAINER_IMAGE, "spark-driver:latest")
         .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
-      factor.foreach { value => sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, value) }
+      factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
       val conf = KubernetesTestConf.createDriverConf(
         sparkConf = sparkConf,
         mainAppResource = resource)
@@ -218,63 +218,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       assert(mem === s"${expected}Mi")
 
       val systemProperties = step.getAdditionalPodSystemProperties()
-      assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
+      assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
     }
   }
 
-  test(s"SPARK-38194: memory overhead factor precendence") {
-    // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
-    val driverMem =
-      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
-
-    // main app resource, overhead factor
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
-
-    // New config should take precedence
-    val expectedFactor = 0.2
-    sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor)
-    sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)
-
-    val conf = KubernetesTestConf.createDriverConf(
-      sparkConf = sparkConf)
-    val step = new BasicDriverFeatureStep(conf)
-    val pod = step.configurePod(SparkPod.initialPod())
-    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
-    val expected = (driverMem + driverMem * expectedFactor).toInt
-    assert(mem === s"${expected}Mi")
-
-    val systemProperties = step.getAdditionalPodSystemProperties()
-    assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
-  }
-
-  test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
-    // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
-    val driverMem =
-      ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2
-
-    // main app resource, overhead factor
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
-
-    // Old config still works if new config isn't given
-    val expectedFactor = 0.3
-    sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)
-
-    val conf = KubernetesTestConf.createDriverConf(
-      sparkConf = sparkConf)
-    val step = new BasicDriverFeatureStep(conf)
-    val pod = step.configurePod(SparkPod.initialPod())
-    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
-    val expected = (driverMem + driverMem * expectedFactor).toInt
-    assert(mem === s"${expected}Mi")
-
-    val systemProperties = step.getAdditionalPodSystemProperties()
-    assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString)
-  }
-
   test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") {
     val initPod = SparkPod.initialPod()
     val sparkConf = new SparkConf()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 731a9b7..f5f2712 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -441,60 +441,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
     ))
   }
 
-  test(s"SPARK-38194: memory overhead factor precendence") {
-    // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
-    val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
-    val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2
-
-    // main app resource, overhead factor
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")
-
-    // New config should take precedence
-    val expectedFactor = 0.2
-    sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, expectedFactor)
-    sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3)
-
-    val conf = KubernetesTestConf.createExecutorConf(
-      sparkConf = sparkConf)
-    ResourceProfile.clearDefaultProfile()
-    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
-    val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
-      resourceProfile)
-    val pod = step.configurePod(SparkPod.initialPod())
-    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
-    val expected = (executorMem + executorMem * expectedFactor).toInt
-    assert(mem === s"${expected}Mi")
-  }
-
-  test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") {
-    // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB
-    val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get
-    val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2
-
-    // main app resource, overhead factor
-    val sparkConf = new SparkConf(false)
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m")
-
-    // New config should take precedence
-    val expectedFactor = 0.3
-    sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor)
-
-    val conf = KubernetesTestConf.createExecutorConf(
-      sparkConf = sparkConf)
-    ResourceProfile.clearDefaultProfile()
-    val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
-    val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf),
-      resourceProfile)
-    val pod = step.configurePod(SparkPod.initialPod())
-    val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
-    val expected = (executorMem + executorMem * expectedFactor).toInt
-    assert(mem === s"${expected}Mi")
-  }
-
-
   // There is always exactly one controller reference, and it points to the driver pod.
   private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
     assert(executor.getMetadata.getOwnerReferences.size() === 1)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 9e41878..2fd13a5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -105,7 +105,6 @@ private[mesos] class MesosSubmitRequestServlet(
     val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
     val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
     val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key)
-    val driverMemoryOverheadFactor = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key)
     val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
     val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
 
@@ -122,10 +121,8 @@ private[mesos] class MesosSubmitRequestServlet(
       mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
     val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
     val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
-    val actualDriverMemoryFactor = driverMemoryOverheadFactor.map(_.toDouble).getOrElse(
-      MEMORY_OVERHEAD_FACTOR)
     val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse(
-      math.max((actualDriverMemoryFactor * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
+      math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
     val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
     val submitDate = new Date()
     val submissionId = newDriverId(submitDate)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 524b1d5..38f83df 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -387,7 +387,8 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  // This default copied from YARN
+  // These defaults copied from YARN
+  private val MEMORY_OVERHEAD_FRACTION = 0.10
   private val MEMORY_OVERHEAD_MINIMUM = 384
 
   /**
@@ -399,9 +400,8 @@ trait MesosSchedulerUtils extends Logging {
    *         (whichever is larger)
    */
   def executorMemory(sc: SparkContext): Int = {
-    val memoryOverheadFactor = sc.conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
     sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-      math.max(memoryOverheadFactor * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+      math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
       sc.executorMemory
   }
 
@@ -415,8 +415,7 @@ trait MesosSchedulerUtils extends Logging {
    *         `MEMORY_OVERHEAD_FRACTION (=0.1) * driverMemory`
    */
   def driverContainerMemory(driverDesc: MesosDriverDescription): Int = {
-    val memoryOverheadFactor = driverDesc.conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
-    val defaultMem = math.max(memoryOverheadFactor * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
+    val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
     driverDesc.conf.get(mesosConfig.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) +
       driverDesc.mem
   }
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala
index 8bed43a..344fc38 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala
@@ -35,14 +35,8 @@ class MesosRestServerSuite extends SparkFunSuite
     testOverheadMemory(new SparkConf(), "2000M", 2384)
   }
 
-  test("test driver overhead memory with default overhead factor") {
-    testOverheadMemory(new SparkConf(), "5000M", 5500)
-  }
-
   test("test driver overhead memory with overhead factor") {
-    val conf = new SparkConf()
-    conf.set(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key, "0.2")
-    testOverheadMemory(conf, "5000M", 6000)
+    testOverheadMemory(new SparkConf(), "5000M", 5500)
   }
 
   test("test configured driver overhead memory") {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f364b79..ae85ea8 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -54,7 +54,6 @@ import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.ResourceRequestHelper._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -71,6 +70,7 @@ private[spark] class Client(
   extends Logging {
 
   import Client._
+  import YarnSparkHadoopUtil._
 
   private val yarnClient = YarnClient.createYarnClient
   private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
@@ -85,12 +85,6 @@ private[spark] class Client(
   private var appMaster: ApplicationMaster = _
   private var stagingDirPath: Path = _
 
-  private val amMemoryOverheadFactor = if (isClusterMode) {
-    sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)
-  } else {
-    AM_MEMORY_OVERHEAD_FACTOR
-  }
-
   // AM related configurations
   private val amMemory = if (isClusterMode) {
     sparkConf.get(DRIVER_MEMORY).toInt
@@ -100,7 +94,7 @@ private[spark] class Client(
   private val amMemoryOverhead = {
     val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
     sparkConf.get(amMemoryOverheadEntry).getOrElse(
-      math.max((amMemoryOverheadFactor * amMemory).toLong,
+      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong,
         ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
   }
   private val amCores = if (isClusterMode) {
@@ -113,10 +107,8 @@ private[spark] class Client(
   private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
   // Executor offHeap memory in MiB.
   protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf)
-
-  private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
   private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
-    math.max((executorMemoryOvereadFactor * executorMemory).toLong,
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong,
       ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt
 
   private val isPython = sparkConf.get(IS_PYTHON_APP)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a85b717..54ab643 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -163,8 +163,6 @@ private[yarn] class YarnAllocator(
 
   private val isPythonApp = sparkConf.get(IS_PYTHON_APP)
 
-  private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)
-
   private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
     "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
 
@@ -282,10 +280,9 @@ private[yarn] class YarnAllocator(
       // track the resource profile if not already there
       getOrUpdateRunningExecutorForRPId(rp.id)
       logInfo(s"Resource profile ${rp.id} doesn't exist, adding it")
-
       val resourcesWithDefaults =
         ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources,
-          memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping)
+          MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp, resourceNameMapping)
       val customSparkResources =
         resourcesWithDefaults.customResources.map { case (name, execReq) =>
           (name, execReq.amount.toString)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 1869c73..f347e37 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -34,10 +34,11 @@ import org.apache.spark.util.Utils
 
 object YarnSparkHadoopUtil {
 
-  // Additional memory overhead for application masters in client mode.
+  // Additional memory overhead
   // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
   // the common cases. Memory overhead tends to grow with container size.
-  val AM_MEMORY_OVERHEAD_FACTOR = 0.10
+
+  val MEMORY_OVERHEAD_FACTOR = 0.10
 
   val ANY_HOST = "*"
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index ae010f1..db65d12 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -706,33 +706,4 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
       sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
     }
   }
-
-  test("SPARK-38194: Configurable memory overhead factor") {
-    val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toLong
-    try {
-      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5)
-      val (handler, _) = createAllocator(maxExecutors = 1,
-        additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
-      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-      val memory = defaultResource.getMemory
-      assert(memory == (executorMemory * 1.5).toLong)
-    } finally {
-      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
-    }
-  }
-
-  test("SPARK-38194: Memory overhead takes precedence over factor") {
-    val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
-    try {
-      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5)
-      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD, (executorMemory * 0.4).toLong)
-      val (handler, _) = createAllocator(maxExecutors = 1,
-        additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
-      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-      val memory = defaultResource.getMemory
-      assert(memory == (executorMemory * 1.4).toLong)
-    } finally {
-      sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1)
-    }
-  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org