You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/30 23:31:27 UTC

git commit: [SPARK-4138][SPARK-4139] Improve dynamic allocation settings

Repository: spark
Updated Branches:
  refs/heads/master 24c512925 -> 26f092d4e


[SPARK-4138][SPARK-4139] Improve dynamic allocation settings

This should be merged after #2746 (SPARK-3795).

**SPARK-4138**. If the user sets both the number of executors and `spark.dynamicAllocation.enabled`, we should throw an exception.

**SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should use the max number of executors as the starting number of executors because the first job is likely to run immediately after application startup. If the latter is not set, throw an exception.

Author: Andrew Or <an...@databricks.com>

Closes #3002 from andrewor14/yarn-set-executors and squashes the following commits:

c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-set-executors
55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false
2b0ccec [Andrew Or] Start the number of executors at the max
022bfde [Andrew Or] Guard against incompatible settings of number of executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f092d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f092d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f092d4

Branch: refs/heads/master
Commit: 26f092d4e32cc1f7e279646075eaf1e495395923
Parents: 24c5129
Author: Andrew Or <an...@databricks.com>
Authored: Thu Oct 30 15:31:23 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Oct 30 15:31:23 2014 -0700

----------------------------------------------------------------------
 .../yarn/ApplicationMasterArguments.scala       |  3 +-
 .../spark/deploy/yarn/ClientArguments.scala     | 30 +++++++++++++++-----
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  2 ++
 .../cluster/YarnClusterSchedulerBackend.scala   |  4 +--
 4 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 5c54e34..104db4f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import collection.mutable.ArrayBuffer
 
 class ApplicationMasterArguments(val args: Array[String]) {
@@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
   var executorCores = 1
-  var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
 
   parseArgs(args.toList)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a12f82d..4d85945 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
 private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024 // MB
   var executorCores = 1
-  var numExecutors = 2
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
   var amQueue = sparkConf.get("spark.yarn.queue", "default")
   var amMemory: Int = 512 // MB
   var appName: String = "Spark"
   var priority = 0
 
-  parseArgs(args.toList)
-  loadEnvironmentArgs()
-
   // Additional memory to allocate to containers
   // For now, use driver's memory overhead as our AM container's memory overhead
-  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", 
+  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
 
-  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", 
+  val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
 
+  private val isDynamicAllocationEnabled =
+    sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
   validateArgs()
 
   /** Load any default arguments provided through environment variables and Spark properties. */
@@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
       .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
       .orNull
+    // If dynamic allocation is enabled, start at the max number of executors
+    if (isDynamicAllocationEnabled) {
+      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
+      if (!sparkConf.contains(maxExecutorsConf)) {
+        throw new IllegalArgumentException(
+          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+      }
+      numExecutors = sparkConf.get(maxExecutorsConf).toInt
+    }
   }
 
   /**
@@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
           if (args(0) == "--num-workers") {
             println("--num-workers is deprecated. Use --num-executors instead.")
           }
+          // Dynamic allocation is not compatible with this option
+          if (isDynamicAllocationEnabled) {
+            throw new IllegalArgumentException("Explicitly setting the number " +
+              "of executors is not compatible with spark.dynamicAllocation.enabled!")
+          }
           numExecutors = value
           args = tail
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index e1e0144..7d453ec 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -93,6 +93,8 @@ object YarnSparkHadoopUtil {
 
   val ANY_HOST = "*"
 
+  val DEFAULT_NUMBER_EXECUTORS = 2
+
   // All RM requests are issued with same priority : we do not (yet) have any distinction between
   // request types (like map/reduce in hadoop for example)
   val RM_REQUEST_PRIORITY = 1

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a96a54f..b1de81e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.IntParam
 
@@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend(
 
   override def start() {
     super.start()
-    totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
     if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
       totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
         .getOrElse(totalExpectedExecutors)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org