You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/02 21:27:10 UTC

spark git commit: SPARK-4585. Spark dynamic executor allocation should use minExecutors as...

Repository: spark
Updated Branches:
  refs/heads/master c081b21b1 -> b2047b55c


SPARK-4585. Spark dynamic executor allocation should use minExecutors as...

... initial number

Author: Sandy Ryza <sa...@cloudera.com>

Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits:

d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required
b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2047b55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2047b55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2047b55

Branch: refs/heads/master
Commit: b2047b55c5fc85de6b63276d8ab9610d2496e08b
Parents: c081b21
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Feb 2 12:27:08 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Feb 2 12:27:08 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 14 ++++++++------
 .../spark/ExecutorAllocationManagerSuite.scala  | 15 +++++++--------
 docs/configuration.md                           | 20 ++++++++++++++------
 docs/job-scheduling.md                          |  9 ++++-----
 .../spark/deploy/yarn/ClientArguments.scala     | 17 +++++++++++++----
 5 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b28da19..5d5288b 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
  *   spark.dynamicAllocation.enabled - Whether this feature is enabled
  *   spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
  *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
+ *   spark.dynamicAllocation.initialExecutors - Number of executors to start with
  *
  *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
  *     If there are backlogged tasks for this duration, add new executors
@@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(
 
   import ExecutorAllocationManager._
 
-  // Lower and upper bounds on the number of executors. These are required.
-  private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
-  private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
+  // Lower and upper bounds on the number of executors.
+  private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
+  private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
+    Integer.MAX_VALUE)
 
   // How long there must be backlogged tasks for before an addition is triggered
   private val schedulerBacklogTimeout = conf.getLong(
@@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
    */
   private def validateSettings(): Unit = {
     if (minNumExecutors < 0 || maxNumExecutors < 0) {
-      throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
+      throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
     }
-    if (minNumExecutors == 0 || maxNumExecutors == 0) {
-      throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
+    if (maxNumExecutors == 0) {
+      throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
     }
     if (minNumExecutors > maxNumExecutors) {
       throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 0e4df17..57081dd 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
   import ExecutorAllocationManagerSuite._
 
   test("verify min/max executors") {
-    // No min or max
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.testing", "true")
-    intercept[SparkException] { new SparkContext(conf) }
-    SparkEnv.get.stop() // cleanup the created environment
-    SparkContext.clearActiveContext()
+    val sc0 = new SparkContext(conf)
+    assert(sc0.executorAllocationManager.isDefined)
+    sc0.stop()
 
-    // Only min
-    val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
+    // Min < 0
+    val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
     intercept[SparkException] { new SparkContext(conf1) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()
 
-    // Only max
-    val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
+    // Max < 0
+    val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
     intercept[SparkException] { new SparkContext(conf2) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index e4e4b8d..08c6bef 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1098,24 +1098,32 @@ Apart from these, the following properties are also available, and may be useful
     available on YARN mode. For more detail, see the description
     <a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
     <br><br>
-    This requires the following configurations to be set:
+    This requires <code>spark.shuffle.service.enabled</code> to be set.
+    The following configurations are also relevant:
     <code>spark.dynamicAllocation.minExecutors</code>,
     <code>spark.dynamicAllocation.maxExecutors</code>, and
-    <code>spark.shuffle.service.enabled</code>
+    <code>spark.dynamicAllocation.initialExecutors</code>
   </td>
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.minExecutors</code></td>
-  <td>(none)</td>
+  <td>0</td>
   <td>
-    Lower bound for the number of executors if dynamic allocation is enabled (required).
+    Lower bound for the number of executors if dynamic allocation is enabled.
   </td>
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.maxExecutors</code></td>
-  <td>(none)</td>
+  <td>Integer.MAX_VALUE</td>
+  <td>
+    Upper bound for the number of executors if dynamic allocation is enabled.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.maxExecutors</code></td>
+  <td><code>spark.dynamicAllocation.minExecutors</code></td>
   <td>
-    Upper bound for the number of executors if dynamic allocation is enabled (required).
+    Initial number of executors to run if dynamic allocation is enabled.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index a5425eb..5295e35 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -77,11 +77,10 @@ scheduling while sharing cluster resources efficiently.
 ### Configuration and Setup
 
 All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
-To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and
-provide lower and upper bounds for the number of executors through
-`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant
-configurations are described on the [configurations page](configuration.html#dynamic-allocation)
-and in the subsequent sections in detail.
+To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true`.
+Other relevant configurations are described on the
+[configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in
+detail.
 
 Additionally, your application must use an external shuffle service. The purpose of the service is
 to preserve the shuffle files written by executors so the executors can be safely removed (more

http://git-wip-us.apache.org/repos/asf/spark/blob/b2047b55/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index f96b245..5eb2023 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
       .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
       .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
       .orNull
-    // If dynamic allocation is enabled, start at the max number of executors
+    // If dynamic allocation is enabled, start at the configured initial number of executors.
+    // Default to minExecutors if no initialExecutors is set.
     if (isDynamicAllocationEnabled) {
+      val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
+      val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
       val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
-      if (!sparkConf.contains(maxExecutorsConf)) {
+      val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
+      val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
+      val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)
+
+      // If defined, initial executors must be between min and max
+      if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
         throw new IllegalArgumentException(
-          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+          s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
       }
-      numExecutors = sparkConf.get(maxExecutorsConf).toInt
+
+      numExecutors = initialNumExecutors
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org