You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/07/27 17:19:00 UTC

spark git commit: [SPARK-21960][STREAMING] Spark Streaming Dynamic Allocation should respect spark.executor.instances

Repository: spark
Updated Branches:
  refs/heads/master 0a0f68bae -> ee5a5a092


[SPARK-21960][STREAMING] Spark Streaming Dynamic Allocation should respect spark.executor.instances

## What changes were proposed in this pull request?

Removes check that `spark.executor.instances` is set to 0 when using Streaming DRA.

## How was this patch tested?

Manual tests

My only concern with this PR is that `spark.executor.instances` (or the actual initial number of executors that the cluster manager gives Spark) can be outside of `spark.streaming.dynamicAllocation.minExecutors` to `spark.streaming.dynamicAllocation.maxExecutors`. I don't see a good way around that, because this code only runs after the SparkContext has been created.

Author: Karthik Palaniappan <ka...@google.com>

Closes #19183 from karth295/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee5a5a09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee5a5a09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee5a5a09

Branch: refs/heads/master
Commit: ee5a5a092517c2ec06b005b11001dd9a4ae60db6
Parents: 0a0f68b
Author: Karthik Palaniappan <ka...@google.com>
Authored: Fri Jul 27 12:18:56 2018 -0500
Committer: Sean Owen <sr...@gmail.com>
Committed: Fri Jul 27 12:18:56 2018 -0500

----------------------------------------------------------------------
 .../scheduler/ExecutorAllocationManager.scala          | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee5a5a09/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 7b29b40..8717555 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, Utils}
 
 /**
- * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
+ * Class that manages executors allocated to a StreamingContext, and dynamically requests or kills
  * executors based on the statistics of the streaming computation. This is different from the core
  * dynamic allocation policy; the core policy relies on executors being idle for a while, but the
  * micro-batch model of streaming prevents any particular executors from being idle for a long
@@ -43,6 +43,10 @@ import org.apache.spark.util.{Clock, Utils}
  *
  * This features should ideally be used in conjunction with backpressure, as backpressure ensures
  * system stability, while executors are being readjusted.
+ *
+ * Note that an initial set of executors (spark.executor.instances) was allocated when the
+ * SparkContext was created. This class scales executors up/down after the StreamingContext
+ * has started.
  */
 private[streaming] class ExecutorAllocationManager(
     client: ExecutorAllocationClient,
@@ -202,12 +206,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
   val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
 
   def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
-    val numExecutor = conf.getInt("spark.executor.instances", 0)
     val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
-    if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
-      throw new IllegalArgumentException(
-        "Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")
-    }
     if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
       throw new IllegalArgumentException(
         """
@@ -217,7 +216,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
         """.stripMargin)
     }
     val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
-    numExecutor == 0 && streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
+    streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
   }
 
   def createIfEnabled(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org