You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2018/07/27 17:19:00 UTC
spark git commit: [SPARK-21960][STREAMING] Spark Streaming Dynamic
Allocation should respect spark.executor.instances
Repository: spark
Updated Branches:
refs/heads/master 0a0f68bae -> ee5a5a092
[SPARK-21960][STREAMING] Spark Streaming Dynamic Allocation should respect spark.executor.instances
## What changes were proposed in this pull request?
Removes check that `spark.executor.instances` is set to 0 when using Streaming DRA.
## How was this patch tested?
Manual tests
My only concern with this PR is that `spark.executor.instances` (or the actual initial number of executors that the cluster manager gives Spark) can be outside of `spark.streaming.dynamicAllocation.minExecutors` to `spark.streaming.dynamicAllocation.maxExecutors`. I don't see a good way around that, because this code only runs after the SparkContext has been created.
Author: Karthik Palaniappan <ka...@google.com>
Closes #19183 from karth295/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee5a5a09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee5a5a09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee5a5a09
Branch: refs/heads/master
Commit: ee5a5a092517c2ec06b005b11001dd9a4ae60db6
Parents: 0a0f68b
Author: Karthik Palaniappan <ka...@google.com>
Authored: Fri Jul 27 12:18:56 2018 -0500
Committer: Sean Owen <sr...@gmail.com>
Committed: Fri Jul 27 12:18:56 2018 -0500
----------------------------------------------------------------------
.../scheduler/ExecutorAllocationManager.scala | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ee5a5a09/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 7b29b40..8717555 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, Utils}
/**
- * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
+ * Class that manages executors allocated to a StreamingContext, and dynamically requests or kills
* executors based on the statistics of the streaming computation. This is different from the core
* dynamic allocation policy; the core policy relies on executors being idle for a while, but the
* micro-batch model of streaming prevents any particular executors from being idle for a long
@@ -43,6 +43,10 @@ import org.apache.spark.util.{Clock, Utils}
*
* This features should ideally be used in conjunction with backpressure, as backpressure ensures
* system stability, while executors are being readjusted.
+ *
+ * Note that an initial set of executors (spark.executor.instances) was allocated when the
+ * SparkContext was created. This class scales executors up/down after the StreamingContext
+ * has started.
*/
private[streaming] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
@@ -202,12 +206,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
- val numExecutor = conf.getInt("spark.executor.instances", 0)
val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
- if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
- throw new IllegalArgumentException(
- "Dynamic Allocation for streaming cannot be enabled while spark.executor.instances is set.")
- }
if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
throw new IllegalArgumentException(
"""
@@ -217,7 +216,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
""".stripMargin)
}
val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
- numExecutor == 0 && streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
+ streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
}
def createIfEnabled(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org