You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/17 00:45:26 UTC
[spark] branch master updated: [SPARK-26941][YARN] Fix incorrect
computation of maxNumExecutorFailures in ApplicationMaster for streaming
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cad475d [SPARK-26941][YARN] Fix incorrect computation of maxNumExecutorFailures in ApplicationMaster for streaming
cad475d is described below
commit cad475dcc9376557f882859856286e858002389a
Author: Liupengcheng <li...@xiaomi.com>
AuthorDate: Sat Mar 16 19:45:05 2019 -0500
[SPARK-26941][YARN] Fix incorrect computation of maxNumExecutorFailures in ApplicationMaster for streaming
## What changes were proposed in this pull request?
Currently, when enabled streaming dynamic allocation for streaming applications, the maxNumExecutorFailures in ApplicationMaster is still computed with `spark.dynamicAllocation.maxExecutors`.
Actually, we should consider `spark.streaming.dynamicAllocation.maxExecutors` instead.
Related codes:
https://github.com/apache/spark/blob/f87153a3acbab9260db356a451ddae86adba41b2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L101
## How was this patch tested?
NA
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes #23845 from liupc/Fix-incorrect-maxNumExecutorFailures-for-streaming.
Lead-authored-by: Liupengcheng <li...@xiaomi.com>
Co-authored-by: liupengcheng <li...@xiaomi.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../apache/spark/internal/config/Streaming.scala | 68 ++++++++++++++++++++++
.../main/scala/org/apache/spark/util/Utils.scala | 7 +++
.../spark/deploy/yarn/ApplicationMaster.scala | 5 +-
.../scheduler/ExecutorAllocationManager.scala | 66 +++++----------------
.../scheduler/ExecutorAllocationManagerSuite.scala | 17 +++---
5 files changed, 104 insertions(+), 59 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala b/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala
new file mode 100644
index 0000000..6e58c09
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+private[spark] object Streaming {
+
+ private[spark] val STREAMING_DYN_ALLOCATION_ENABLED =
+ ConfigBuilder("spark.streaming.dynamicAllocation.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val STREAMING_DYN_ALLOCATION_TESTING =
+ ConfigBuilder("spark.streaming.dynamicAllocation.testing")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS =
+ ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors")
+ .intConf
+ .checkValue(_ > 0, "The min executor number of streaming dynamic " +
+ "allocation must be positive.")
+ .createOptional
+
+ private[spark] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS =
+ ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors")
+ .intConf
+ .checkValue(_ > 0, "The max executor number of streaming dynamic " +
+ "allocation must be positive.")
+ .createWithDefault(Int.MaxValue)
+
+ private[spark] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL =
+ ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ > 0, "The scaling interval of streaming dynamic " +
+ "allocation must be positive.")
+ .createWithDefault(60)
+
+ private[spark] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO =
+ ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio")
+ .doubleConf
+ .checkValue(_ > 0, "The scaling up ratio of streaming dynamic " +
+ "allocation must be positive.")
+ .createWithDefault(0.9)
+
+ private[spark] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO =
+ ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio")
+ .doubleConf
+ .checkValue(_ > 0, "The scaling down ratio of streaming dynamic " +
+ "allocation must be positive.")
+ .createWithDefault(0.3)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2d00453..0398625 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Streaming._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
@@ -2490,6 +2491,12 @@ private[spark] object Utils extends Logging {
(!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING))
}
+ def isStreamingDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+ val streamingDynamicAllocationEnabled = conf.get(STREAMING_DYN_ALLOCATION_ENABLED)
+ streamingDynamicAllocationEnabled &&
+ (!isLocalMaster(conf) || conf.get(STREAMING_DYN_ALLOCATION_TESTING))
+ }
+
/**
* Return the initial number of executors for dynamic allocation.
*/
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 91e1fbd..9ed3b78 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,6 +45,7 @@ import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS
import org.apache.spark.internal.config.UI._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.rpc._
@@ -100,7 +101,9 @@ private[spark] class ApplicationMaster(
private val maxNumExecutorFailures = {
val effectiveNumExecutors =
- if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+ if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) {
+ sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
+ } else if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 8717555..e85a3b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -22,6 +22,7 @@ import scala.util.Random
import org.apache.spark.{ExecutorAllocationClient, SparkConf}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Streaming._
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, Utils}
@@ -55,17 +56,12 @@ private[streaming] class ExecutorAllocationManager(
batchDurationMs: Long,
clock: Clock) extends StreamingListener with Logging {
- import ExecutorAllocationManager._
-
- private val scalingIntervalSecs = conf.getTimeAsSeconds(
- SCALING_INTERVAL_KEY,
- s"${SCALING_INTERVAL_DEFAULT_SECS}s")
- private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
- private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
- private val minNumExecutors = conf.getInt(
- MIN_EXECUTORS_KEY,
- math.max(1, receiverTracker.numReceivers))
- private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
+ private val scalingIntervalSecs = conf.get(STREAMING_DYN_ALLOCATION_SCALING_INTERVAL)
+ private val scalingUpRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO)
+ private val scalingDownRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO)
+ private val minNumExecutors = conf.get(STREAMING_DYN_ALLOCATION_MIN_EXECUTORS)
+ .getOrElse(math.max(1, receiverTracker.numReceivers()))
+ private val maxNumExecutors = conf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
_ => manageAllocation(), "streaming-executor-allocation-manager")
@@ -151,33 +147,16 @@ private[streaming] class ExecutorAllocationManager(
private def validateSettings(): Unit = {
require(
- scalingIntervalSecs > 0,
- s"Config $SCALING_INTERVAL_KEY must be more than 0")
-
- require(
- scalingUpRatio > 0,
- s"Config $SCALING_UP_RATIO_KEY must be more than 0")
-
- require(
- scalingDownRatio > 0,
- s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
-
- require(
- minNumExecutors > 0,
- s"Config $MIN_EXECUTORS_KEY must be more than 0")
-
- require(
- maxNumExecutors > 0,
- s"$MAX_EXECUTORS_KEY must be more than 0")
-
- require(
scalingUpRatio > scalingDownRatio,
- s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
+ s"Config ${STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.key} must be more than config " +
+ s"${STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.key}")
- if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
+ if (conf.contains(STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key) &&
+ conf.contains(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key)) {
require(
maxNumExecutors >= minNumExecutors,
- s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
+ s"Config ${STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key} must be more than config " +
+ s"${STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key}")
}
}
@@ -190,23 +169,9 @@ private[streaming] class ExecutorAllocationManager(
}
private[streaming] object ExecutorAllocationManager extends Logging {
- val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
-
- val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
- val SCALING_INTERVAL_DEFAULT_SECS = 60
-
- val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
- val SCALING_UP_RATIO_DEFAULT = 0.9
-
- val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
- val SCALING_DOWN_RATIO_DEFAULT = 0.3
-
- val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
-
- val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
- val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
+ val streamingDynamicAllocationEnabled = Utils.isStreamingDynamicAllocationEnabled(conf)
if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
throw new IllegalArgumentException(
"""
@@ -215,8 +180,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
|false to use Dynamic Allocation in streaming.
""".stripMargin)
}
- val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
- streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
+ streamingDynamicAllocationEnabled
}
def createIfEnabled(
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index fcbba00..22d027d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING}
+import org.apache.spark.internal.config.Streaming._
import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext}
import org.apache.spark.util.{ManualClock, Utils}
@@ -33,8 +34,6 @@ import org.apache.spark.util.{ManualClock, Utils}
class ExecutorAllocationManagerSuite extends SparkFunSuite
with BeforeAndAfter with BeforeAndAfterAll with MockitoSugar with PrivateMethodTester {
- import ExecutorAllocationManager._
-
private val batchDurationMillis = 1000L
private var allocationClient: ExecutorAllocationClient = null
private var clock: StreamManualClock = null
@@ -58,7 +57,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
reset(allocationClient)
when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
- val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+ val advancedTime = STREAMING_DYN_ALLOCATION_SCALING_INTERVAL.defaultValue.get * 1000 + 1
val expectedWaitTime = clock.getTimeMillis() + advancedTime
clock.advance(advancedTime)
// Make sure ExecutorAllocationManager.manageAllocation is called
@@ -101,25 +100,29 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
}
// Batch proc time slightly more than the scale up ratio, should increase allocation by 1
- addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT + 1) {
+ addBatchProcTimeAndVerifyAllocation(
+ batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) {
verifyTotalRequestedExecs(Some(3))
verifyKilledExec(None)
}
// Batch proc time slightly less than the scale up ratio, should not change allocation
- addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT - 1) {
+ addBatchProcTimeAndVerifyAllocation(
+ batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) {
verifyTotalRequestedExecs(None)
verifyKilledExec(None)
}
// Batch proc time slightly more than the scale down ratio, should not change allocation
- addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT + 1) {
+ addBatchProcTimeAndVerifyAllocation(
+ batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) {
verifyTotalRequestedExecs(None)
verifyKilledExec(None)
}
// Batch proc time slightly more than the scale down ratio, should not change allocation
- addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT - 1) {
+ addBatchProcTimeAndVerifyAllocation(
+ batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) {
verifyTotalRequestedExecs(None)
verifyKilledExec(Some("2"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org