You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2019/09/04 14:01:01 UTC

[spark] branch master updated: [SPARK-28577][YARN] Resource capability requested for each executor add offHeapMemorySize

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a07f795  [SPARK-28577][YARN] Resource capability requested for each executor add offHeapMemorySize
a07f795 is described below

commit a07f795aead3bd81e7cccad30a7f6148c09ed8ad
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Sep 4 09:00:12 2019 -0500

    [SPARK-28577][YARN] Resource capability requested for each executor add offHeapMemorySize
    
    ## What changes were proposed in this pull request?
    
    If MEMORY_OFFHEAP_ENABLED is true, add MEMORY_OFFHEAP_SIZE to resource requested for executor to ensure instance has enough memory to use.
    
    In this pr add a helper method `executorOffHeapMemorySizeAsMb` in `YarnSparkHadoopUtil`.
    
    ## How was this patch tested?
    Add 3 new test suite to test `YarnSparkHadoopUtil#executorOffHeapMemorySizeAsMb`
    
    Closes #25309 from LuciferYang/spark-28577.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 docs/configuration.md                              | 17 ++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala      | 14 +++++++----
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  4 +++-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    | 16 +++++++++++--
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 21 ++++++++++++++++
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala     | 28 ++++++++++++++++++++++
 6 files changed, 82 insertions(+), 18 deletions(-)

diff --git a/docs/configuration.md b/docs/configuration.md
index aad496d..9933283 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -271,17 +271,17 @@ of the most common options to set are:
  <td><code>spark.executor.memoryOverhead</code></td>
   <td>executorMemory * 0.10, with minimum of 384 </td>
   <td>
-    Amount of non-heap memory to be allocated per executor process in cluster mode, in MiB unless
+    Amount of additional memory to be allocated per executor process in cluster mode, in MiB unless
     otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
     other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
     This option is currently supported on YARN and Kubernetes.
     <br/>
-    <em>Note:</em> Non-heap memory includes off-heap memory 
-    (when <code>spark.memory.offHeap.enabled=true</code>) and memory used by other executor processes
-    (e.g. python process that goes with a PySpark executor) and memory used by other non-executor 
-    processes running in the same container. The maximum memory size of container to running executor 
-    is determined by the sum of <code>spark.executor.memoryOverhead</code> and 
-    <code>spark.executor.memory</code>.
+    <em>Note:</em> Additional memory includes PySpark executor memory 
+    (when <code>spark.executor.pyspark.memory</code> is not configured) and memory used by other
+    non-executor processes running in the same container. The maximum memory size of container to 
+    running executor is determined by the sum of <code>spark.executor.memoryOverhead</code>, 
+    <code>spark.executor.memory</code>, <code>spark.memory.offHeap.size</code> and 
+    <code>spark.executor.pyspark.memory</code>.
   </td>
 </tr>
 <tr>
@@ -1378,9 +1378,6 @@ Apart from these, the following properties are also available, and may be useful
   <td>
     If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory 
     use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
-    <em>Note:</em> If off-heap memory is enabled, may need to raise the non-heap memory size
-    (e.g. increase <code>spark.driver.memoryOverhead</code> or
-    <code>spark.executor.memoryOverhead</code>).
   </td>
 </tr>
 <tr>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5fc6894..9be3e7b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -97,6 +97,8 @@ private[spark] class Client(
 
   // Executor related configurations
   private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+  // Executor offHeap memory in MiB.
+  protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
   private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
 
@@ -346,12 +348,14 @@ private[spark] class Client(
     val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
     logInfo("Verifying our application has not requested more than the maximum " +
       s"memory capability of the cluster ($maxMem MB per container)")
-    val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
+    val executorMem =
+      executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
     if (executorMem > maxMem) {
-      throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
-        s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
-        s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
-        s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
+      throw new IllegalArgumentException(s"Required executor memory ($executorMemory MB), " +
+        s"offHeap memory ($executorOffHeapMemory) MB, overhead ($executorMemoryOverhead MB), " +
+        s"and PySpark memory ($pysparkWorkerMemory MB) is above the max threshold ($maxMem MB) " +
+        "of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' " +
+        "and/or 'yarn.nodemanager.resource.memory-mb'.")
     }
     val amMem = amMemory + amMemoryOverhead
     if (amMem > maxMem) {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8ec7bd6..f68be33 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -131,6 +131,8 @@ private[yarn] class YarnAllocator(
 
   // Executor memory in MiB.
   protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+  // Executor offHeap memory in MiB.
+  protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
   // Additional memory overhead.
   protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
@@ -149,7 +151,7 @@ private[yarn] class YarnAllocator(
   // Resource capability requested for each executor
   private[yarn] val resource: Resource = {
     val resource = Resource.newInstance(
-      executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
+      executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
     ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
     logDebug(s"Created resource capability: $resource")
     resource
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 1103552..9cefc40 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P
 import org.apache.hadoop.yarn.util.ConverterUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
-import org.apache.spark.resource.ResourceID
-import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.util.Utils
 
 object YarnSparkHadoopUtil {
@@ -184,4 +183,17 @@ object YarnSparkHadoopUtil {
     ConverterUtils.toContainerId(containerIdString)
   }
 
+  /**
+   * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
+   */
+  def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
+    if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
+      val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
+      require(sizeInMB > 0,
+        s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
+      sizeInMB
+    } else {
+      0
+    }
+  }
 }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 4ac27ed..6f47a41 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -514,4 +514,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     verify(rmClientSpy)
       .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
   }
+
+  test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " +
+    "when offHeapEnabled is true.") {
+    val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED)
+    val originalOffHeapSize = sparkConf.get(MEMORY_OFFHEAP_SIZE)
+    val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
+    val offHeapMemoryInMB = 1024L
+    val offHeapMemoryInByte = offHeapMemoryInMB * 1024 * 1024
+    try {
+      sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
+      sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
+      val allocator = createAllocator(maxExecutors = 1,
+        additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
+      val memory = allocator.resource.getMemory
+      assert(memory ==
+        executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
+    } finally {
+      sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled)
+      sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
+    }
+  }
 }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index e7cde03..c88bb29 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.Matchers
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.util.{ResetSystemProperties, Utils}
 
@@ -140,4 +141,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
     }
 
   }
+
+  test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
+    val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf())
+    assert(executorOffHeapMemory == 0)
+  }
+
+  test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
+    val offHeapMemoryInMB = 50
+    val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
+    val sparkConf = new SparkConf()
+      .set(MEMORY_OFFHEAP_ENABLED, true)
+      .set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
+    val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
+    assert(executorOffHeapMemory == offHeapMemoryInMB)
+  }
+
+  test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
+    "but MEMORY_OFFHEAP_SIZE not config scene") {
+    val sparkConf = new SparkConf()
+      .set(MEMORY_OFFHEAP_ENABLED, true)
+    val expected =
+      s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
+    val message = intercept[IllegalArgumentException] {
+      YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
+    }.getMessage
+    assert(message.contains(expected))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org