You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/10/02 20:48:42 UTC

git commit: Modify default YARN memory_overhead-- from an additive constant to a multiplier

Repository: spark
Updated Branches:
  refs/heads/master 82a6a083a -> b4fb7b80a


Modify default YARN memory_overhead-- from an additive constant to a multiplier

Redone against the recent master branch (https://github.com/apache/spark/pull/1391)

Author: Nishkam Ravi <nr...@cloudera.com>
Author: nravi <nr...@c1704.halxg.cloudera.com>
Author: nishkamravi2 <ni...@gmail.com>

Closes #2485 from nishkamravi2/master_nravi and squashes the following commits:

636a9ff [nishkamravi2] Update YarnAllocator.scala
8f76c8b [Nishkam Ravi] Doc change for yarn memory overhead
35daa64 [Nishkam Ravi] Slight change in the doc for yarn memory overhead
5ac2ec1 [Nishkam Ravi] Remove out
dac1047 [Nishkam Ravi] Additional documentation for yarn memory overhead issue
42c2c3d [Nishkam Ravi] Additional changes for yarn memory overhead issue
362da5e [Nishkam Ravi] Additional changes for yarn memory overhead
c726bd9 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi
f00fa31 [Nishkam Ravi] Improving logging for AM memoryOverhead
1cf2d1e [nishkamravi2] Update YarnAllocator.scala
ebcde10 [Nishkam Ravi] Modify default YARN memory_overhead-- from an additive constant to a multiplier (redone to resolve merge conflicts)
2e69f11 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi
efd688a [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark
2b630f9 [nravi] Accept memory input as "30g", "512M" instead of an int value, to be consistent with rest of Spark
3bf8fad [nravi] Merge branch 'master' of https://github.com/apache/spark
5423a03 [nravi] Merge branch 'master' of https://github.com/apache/spark
eb663ca [nravi] Merge branch 'master' of https://github.com/apache/spark
df2aeb1 [nravi] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456)
6b840f0 [nravi] Undo the fix for SPARK-1758 (the problem is fixed)
5108700 [nravi] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456)
681b36f [nravi] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4fb7b80
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4fb7b80
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4fb7b80

Branch: refs/heads/master
Commit: b4fb7b80a0d863500943d788ad3e34d502a6dafa
Parents: 82a6a08
Author: Nishkam Ravi <nr...@cloudera.com>
Authored: Thu Oct 2 13:48:35 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Thu Oct 2 13:48:35 2014 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                             |  8 ++++----
 .../apache/spark/deploy/yarn/ClientArguments.scala  | 16 +++++++++-------
 .../org/apache/spark/deploy/yarn/ClientBase.scala   | 12 ++++++++----
 .../apache/spark/deploy/yarn/YarnAllocator.scala    | 16 ++++++++--------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala     |  8 ++++++--
 5 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4fb7b80/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4b3a49e..695813a 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -79,16 +79,16 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
 </tr>
 <tr>
  <td><code>spark.yarn.executor.memoryOverhead</code></td>
-  <td>384</td>
+  <td>executorMemory * 0.07, with minimum of 384 </td>
   <td>
-    The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
+    The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
   </td>
 </tr>
 <tr>
   <td><code>spark.yarn.driver.memoryOverhead</code></td>
-  <td>384</td>
+  <td>driverMemory * 0.07, with minimum of 384 </td>
   <td>
-    The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
+    The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/b4fb7b80/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 26dbd62..a12f82d 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
 import org.apache.spark.util.{Utils, IntParam, MemoryParam}
-
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
 private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -39,15 +39,17 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var appName: String = "Spark"
   var priority = 0
 
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
+
   // Additional memory to allocate to containers
   // For now, use driver's memory overhead as our AM container's memory overhead
-  val amMemoryOverhead = sparkConf.getInt(
-    "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-  val executorMemoryOverhead = sparkConf.getInt(
-    "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", 
+    math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", 
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
 
-  parseArgs(args.toList)
-  loadEnvironmentArgs()
   validateArgs()
 
   /** Load any default arguments provided through environment variables and Spark properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/b4fb7b80/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 1cf19c1..6ecac6e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -64,14 +64,18 @@ private[spark] trait ClientBase extends Logging {
       s"memory capability of the cluster ($maxMem MB per container)")
     val executorMem = args.executorMemory + executorMemoryOverhead
     if (executorMem > maxMem) {
-      throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
-        s"is above the max threshold ($maxMem MB) of this cluster!")
+      throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + 
+        s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
     }
     val amMem = args.amMemory + amMemoryOverhead
     if (amMem > maxMem) {
-      throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
-        s"is above the max threshold ($maxMem MB) of this cluster!")
+      throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + 
+        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
     }
+    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
+      amMem,
+      amMemoryOverhead))
+
     // We could add checks to make sure the entire cluster has enough resources but that involves
     // getting all the node reports and computing ourselves.
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b4fb7b80/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 299e38a..4f4f1d2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 
 object AllocationType extends Enumeration {
   type AllocationType = Value
@@ -78,10 +79,6 @@ private[yarn] abstract class YarnAllocator(
   // Containers to be released in next request to RM
   private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
 
-  // Additional memory overhead - in mb.
-  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
-    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
   // Number of container requests that have been sent to, but not yet allocated by the
   // ApplicationMaster.
   private val numPendingAllocate = new AtomicInteger()
@@ -97,6 +94,10 @@ private[yarn] abstract class YarnAllocator(
   protected val (preferredHostToCount, preferredRackToCount) =
     generateNodeToWeight(conf, preferredNodes)
 
+  // Additional memory overhead - in mb.
+  protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+
   private val launcherPool = new ThreadPoolExecutor(
     // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
     sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
@@ -114,12 +115,11 @@ private[yarn] abstract class YarnAllocator(
 
     // this is needed by alpha, do it here since we add numPending right after this
     val executorsPending = numPendingAllocate.get()
-
     if (missing > 0) {
+      val totalExecutorMemory = executorMemory + memoryOverhead
       numPendingAllocate.addAndGet(missing)
-      logInfo("Will Allocate %d executor containers, each with %d memory".format(
-        missing,
-        (executorMemory + memoryOverhead)))
+      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + 
+        s"memory including $memoryOverhead MB overhead")
     } else {
       logDebug("Empty allocation request ...")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b4fb7b80/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 0b712c2..e1e0144 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -84,8 +84,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 }
 
 object YarnSparkHadoopUtil {
-  // Additional memory overhead - in mb.
-  val DEFAULT_MEMORY_OVERHEAD = 384
+  // Additional memory overhead 
+  // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering
+  // the common cases. Memory overhead tends to grow with container size. 
+
+  val MEMORY_OVERHEAD_FACTOR = 0.07
+  val MEMORY_OVERHEAD_MIN = 384
 
   val ANY_HOST = "*"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org