You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/04/24 15:57:34 UTC

spark git commit: [SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation

Repository: spark
Updated Branches:
  refs/heads/master 4926a7c2f -> 55c4ca88a


[SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation

## What changes were proposed in this pull request?

By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes the
latency of the job, with small tasks this setting can waste a lot of resources due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a ratio that will be used to reduce the number of
target executors w.r.t. full parallelism.

The number of executors computed with this setting is still fenced by
`spark.dynamicAllocation.maxExecutors` and `spark.dynamicAllocation.minExecutors`

## How was this patch tested?
Units tests and runs on various actual workloads on a Yarn Cluster

Author: Julien Cuquemelle <j....@criteo.com>

Closes #19881 from jcuquemelle/AddTaskPerExecutorSlot.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55c4ca88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55c4ca88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55c4ca88

Branch: refs/heads/master
Commit: 55c4ca88a3b093ee197a8689631be8d1fac1f10f
Parents: 4926a7c
Author: Julien Cuquemelle <j....@criteo.com>
Authored: Tue Apr 24 10:56:55 2018 -0500
Committer: Thomas Graves <tg...@oath.com>
Committed: Tue Apr 24 10:56:55 2018 -0500

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 24 +++++++++++---
 .../apache/spark/internal/config/package.scala  |  4 +++
 .../spark/ExecutorAllocationManagerSuite.scala  | 33 ++++++++++++++++++++
 docs/configuration.md                           | 18 +++++++++++
 4 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/55c4ca88/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 189d913..aa363ee 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
+import org.apache.spark.internal.config._
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMaster
@@ -69,6 +69,10 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
  *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
  *   spark.dynamicAllocation.initialExecutors - Number of executors to start with
  *
+ *   spark.dynamicAllocation.executorAllocationRatio -
+ *     This is used to reduce the parallelism of the dynamic allocation that can waste
+ *     resources when tasks are small
+ *
  *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
  *     If there are backlogged tasks for this duration, add new executors
  *
@@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager(
   // TODO: The default value of 1 for spark.executor.cores works right now because dynamic
   // allocation is only supported for YARN and the default number of cores per executor in YARN is
   // 1, but it might need to be attained differently for different cluster managers
-  private val tasksPerExecutor =
+  private val tasksPerExecutorForFullParallelism =
     conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
 
+  private val executorAllocationRatio =
+    conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
+
   validateSettings()
 
   // Number of executors to add in the next round
@@ -209,8 +216,13 @@ private[spark] class ExecutorAllocationManager(
       throw new SparkException("Dynamic allocation of executors requires the external " +
         "shuffle service. You may enable this through spark.shuffle.service.enabled.")
     }
-    if (tasksPerExecutor == 0) {
-      throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
+    if (tasksPerExecutorForFullParallelism == 0) {
+      throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
+    }
+
+    if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
+      throw new SparkException(
+        "spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0")
     }
   }
 
@@ -273,7 +285,9 @@ private[spark] class ExecutorAllocationManager(
    */
   private def maxNumExecutorsNeeded(): Int = {
     val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
-    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
+    math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
+              tasksPerExecutorForFullParallelism)
+      .toInt
   }
 
   private def totalRunningTasks(): Int = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/55c4ca88/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 99d779f..6bb98c3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -126,6 +126,10 @@ package object config {
   private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
     ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
 
+  private[spark] val DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO =
+    ConfigBuilder("spark.dynamicAllocation.executorAllocationRatio")
+      .doubleConf.createWithDefault(1.0)
+
   private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefaultString("3s")

http://git-wip-us.apache.org/repos/asf/spark/blob/55c4ca88/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 9807d12..3cfb0a9 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -145,6 +145,39 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
   }
 
+  def testAllocationRatio(cores: Int, divisor: Double, expected: Int): Unit = {
+    val conf = new SparkConf()
+      .setMaster("myDummyLocalExternalClusterManager")
+      .setAppName("test-executor-allocation-manager")
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.testing", "true")
+      .set("spark.dynamicAllocation.maxExecutors", "15")
+      .set("spark.dynamicAllocation.minExecutors", "3")
+      .set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
+      .set("spark.executor.cores", cores.toString)
+    val sc = new SparkContext(conf)
+    contexts += sc
+    var manager = sc.executorAllocationManager.get
+    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 20)))
+    for (i <- 0 to 5) {
+      addExecutors(manager)
+    }
+    assert(numExecutorsTarget(manager) === expected)
+    sc.stop()
+  }
+
+  test("executionAllocationRatio is correctly handled") {
+    testAllocationRatio(1, 0.5, 10)
+    testAllocationRatio(1, 1.0/3.0, 7)
+    testAllocationRatio(2, 1.0/3.0, 4)
+    testAllocationRatio(1, 0.385, 8)
+
+    // max/min executors capping
+    testAllocationRatio(1, 1.0, 15) // should be 20 but capped by max
+    testAllocationRatio(4, 1.0/3.0, 3)  // should be 2 but elevated by min
+  }
+
+
   test("add executors capped by num pending tasks") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get

http://git-wip-us.apache.org/repos/asf/spark/blob/55c4ca88/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 4d4d0c5..fb02d7e 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1753,6 +1753,7 @@ Apart from these, the following properties are also available, and may be useful
     <code>spark.dynamicAllocation.minExecutors</code>,
     <code>spark.dynamicAllocation.maxExecutors</code>, and
     <code>spark.dynamicAllocation.initialExecutors</code>
+    <code>spark.dynamicAllocation.executorAllocationRatio</code>
   </td>
 </tr>
 <tr>
@@ -1798,6 +1799,23 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.dynamicAllocation.executorAllocationRatio</code></td>
+  <td>1</td>
+  <td>
+    By default, the dynamic allocation will request enough executors to maximize the
+    parallelism according to the number of tasks to process. While this minimizes the
+    latency of the job, with small tasks this setting can waste a lot of resources due to
+    executor allocation overhead, as some executor might not even do any work.
+    This setting allows to set a ratio that will be used to reduce the number of
+    executors w.r.t. full parallelism.
+    Defaults to 1.0 to give maximum parallelism.
+    0.5 will divide the target number of executors by 2
+    The target number of executors computed by the dynamicAllocation can still be overriden
+    by the <code>spark.dynamicAllocation.minExecutors</code> and
+    <code>spark.dynamicAllocation.maxExecutors</code> settings
+  </td>
+</tr>
+<tr>
   <td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
   <td>1s</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org