You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/17 00:45:26 UTC

[spark] branch master updated: [SPARK-26941][YARN] Fix incorrect computation of maxNumExecutorFailures in ApplicationMaster for streaming

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cad475d  [SPARK-26941][YARN] Fix incorrect computation of maxNumExecutorFailures in ApplicationMaster for streaming
cad475d is described below

commit cad475dcc9376557f882859856286e858002389a
Author: Liupengcheng <li...@xiaomi.com>
AuthorDate: Sat Mar 16 19:45:05 2019 -0500

    [SPARK-26941][YARN] Fix incorrect computation of maxNumExecutorFailures in ApplicationMaster for streaming
    
    ## What changes were proposed in this pull request?
    
    Currently, when enabled streaming dynamic allocation for streaming applications, the maxNumExecutorFailures in ApplicationMaster is still computed with `spark.dynamicAllocation.maxExecutors`.
    
    Actually, we should consider `spark.streaming.dynamicAllocation.maxExecutors` instead.
    
    Related codes:
    https://github.com/apache/spark/blob/f87153a3acbab9260db356a451ddae86adba41b2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L101
    
    ## How was this patch tested?
    
    NA
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #23845 from liupc/Fix-incorrect-maxNumExecutorFailures-for-streaming.
    
    Lead-authored-by: Liupengcheng <li...@xiaomi.com>
    Co-authored-by: liupengcheng <li...@xiaomi.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../apache/spark/internal/config/Streaming.scala   | 68 ++++++++++++++++++++++
 .../main/scala/org/apache/spark/util/Utils.scala   |  7 +++
 .../spark/deploy/yarn/ApplicationMaster.scala      |  5 +-
 .../scheduler/ExecutorAllocationManager.scala      | 66 +++++----------------
 .../scheduler/ExecutorAllocationManagerSuite.scala | 17 +++---
 5 files changed, 104 insertions(+), 59 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala b/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala
new file mode 100644
index 0000000..6e58c09
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+private[spark] object Streaming {
+
+  private[spark] val STREAMING_DYN_ALLOCATION_ENABLED =
+    ConfigBuilder("spark.streaming.dynamicAllocation.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val STREAMING_DYN_ALLOCATION_TESTING =
+    ConfigBuilder("spark.streaming.dynamicAllocation.testing")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS =
+    ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors")
+      .intConf
+      .checkValue(_ > 0, "The min executor number of streaming dynamic " +
+        "allocation must be positive.")
+      .createOptional
+
+  private[spark] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS =
+    ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors")
+      .intConf
+      .checkValue(_ > 0, "The max executor number of streaming dynamic " +
+        "allocation must be positive.")
+      .createWithDefault(Int.MaxValue)
+
+  private[spark] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL =
+    ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ > 0, "The scaling interval of streaming dynamic " +
+        "allocation must be positive.")
+      .createWithDefault(60)
+
+  private[spark] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO =
+    ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio")
+      .doubleConf
+      .checkValue(_ > 0, "The scaling up ratio of streaming dynamic " +
+        "allocation must be positive.")
+      .createWithDefault(0.9)
+
+  private[spark] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO =
+    ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio")
+      .doubleConf
+      .checkValue(_ > 0, "The scaling down ratio of streaming dynamic " +
+        "allocation must be positive.")
+      .createWithDefault(0.3)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2d00453..0398625 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Streaming._
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
@@ -2490,6 +2491,12 @@ private[spark] object Utils extends Logging {
       (!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING))
   }
 
+  def isStreamingDynamicAllocationEnabled(conf: SparkConf): Boolean = {
+    val streamingDynamicAllocationEnabled = conf.get(STREAMING_DYN_ALLOCATION_ENABLED)
+    streamingDynamicAllocationEnabled &&
+      (!isLocalMaster(conf) || conf.get(STREAMING_DYN_ALLOCATION_TESTING))
+  }
+
   /**
    * Return the initial number of executors for dynamic allocation.
    */
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 91e1fbd..9ed3b78 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,6 +45,7 @@ import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.rpc._
@@ -100,7 +101,9 @@ private[spark] class ApplicationMaster(
 
   private val maxNumExecutorFailures = {
     val effectiveNumExecutors =
-      if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+      if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) {
+        sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
+      } else if (Utils.isDynamicAllocationEnabled(sparkConf)) {
         sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
       } else {
         sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 8717555..e85a3b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 
 import org.apache.spark.{ExecutorAllocationClient, SparkConf}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Streaming._
 import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, Utils}
 
@@ -55,17 +56,12 @@ private[streaming] class ExecutorAllocationManager(
     batchDurationMs: Long,
     clock: Clock) extends StreamingListener with Logging {
 
-  import ExecutorAllocationManager._
-
-  private val scalingIntervalSecs = conf.getTimeAsSeconds(
-    SCALING_INTERVAL_KEY,
-    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
-  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
-  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
-  private val minNumExecutors = conf.getInt(
-    MIN_EXECUTORS_KEY,
-    math.max(1, receiverTracker.numReceivers))
-  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
+  private val scalingIntervalSecs = conf.get(STREAMING_DYN_ALLOCATION_SCALING_INTERVAL)
+  private val scalingUpRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO)
+  private val scalingDownRatio = conf.get(STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO)
+  private val minNumExecutors = conf.get(STREAMING_DYN_ALLOCATION_MIN_EXECUTORS)
+    .getOrElse(math.max(1, receiverTracker.numReceivers()))
+  private val maxNumExecutors = conf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
   private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
     _ => manageAllocation(), "streaming-executor-allocation-manager")
 
@@ -151,33 +147,16 @@ private[streaming] class ExecutorAllocationManager(
 
   private def validateSettings(): Unit = {
     require(
-      scalingIntervalSecs > 0,
-      s"Config $SCALING_INTERVAL_KEY must be more than 0")
-
-    require(
-      scalingUpRatio > 0,
-      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
-
-    require(
-      scalingDownRatio > 0,
-      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
-
-    require(
-      minNumExecutors > 0,
-      s"Config $MIN_EXECUTORS_KEY must be more than 0")
-
-    require(
-      maxNumExecutors > 0,
-      s"$MAX_EXECUTORS_KEY must be more than 0")
-
-    require(
       scalingUpRatio > scalingDownRatio,
-      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
+      s"Config ${STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.key} must be more than config " +
+        s"${STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.key}")
 
-    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
+    if (conf.contains(STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key) &&
+      conf.contains(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key)) {
       require(
         maxNumExecutors >= minNumExecutors,
-        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
+        s"Config ${STREAMING_DYN_ALLOCATION_MAX_EXECUTORS.key} must be more than config " +
+          s"${STREAMING_DYN_ALLOCATION_MIN_EXECUTORS.key}")
     }
   }
 
@@ -190,23 +169,9 @@ private[streaming] class ExecutorAllocationManager(
 }
 
 private[streaming] object ExecutorAllocationManager extends Logging {
-  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
-
-  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
-  val SCALING_INTERVAL_DEFAULT_SECS = 60
-
-  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
-  val SCALING_UP_RATIO_DEFAULT = 0.9
-
-  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
-  val SCALING_DOWN_RATIO_DEFAULT = 0.3
-
-  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
-
-  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
 
   def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
-    val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
+    val streamingDynamicAllocationEnabled = Utils.isStreamingDynamicAllocationEnabled(conf)
     if (Utils.isDynamicAllocationEnabled(conf) && streamingDynamicAllocationEnabled) {
       throw new IllegalArgumentException(
         """
@@ -215,8 +180,7 @@ private[streaming] object ExecutorAllocationManager extends Logging {
           |false to use Dynamic Allocation in streaming.
         """.stripMargin)
     }
-    val testing = conf.getBoolean("spark.streaming.dynamicAllocation.testing", false)
-    streamingDynamicAllocationEnabled && (!Utils.isLocalMaster(conf) || testing)
+    streamingDynamicAllocationEnabled
   }
 
   def createIfEnabled(
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index fcbba00..22d027d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING}
+import org.apache.spark.internal.config.Streaming._
 import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext}
 import org.apache.spark.util.{ManualClock, Utils}
 
@@ -33,8 +34,6 @@ import org.apache.spark.util.{ManualClock, Utils}
 class ExecutorAllocationManagerSuite extends SparkFunSuite
   with BeforeAndAfter with BeforeAndAfterAll with MockitoSugar with PrivateMethodTester {
 
-  import ExecutorAllocationManager._
-
   private val batchDurationMillis = 1000L
   private var allocationClient: ExecutorAllocationClient = null
   private var clock: StreamManualClock = null
@@ -58,7 +57,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
         reset(allocationClient)
         when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
         addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
-        val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+        val advancedTime = STREAMING_DYN_ALLOCATION_SCALING_INTERVAL.defaultValue.get * 1000 + 1
         val expectedWaitTime = clock.getTimeMillis() + advancedTime
         clock.advance(advancedTime)
         // Make sure ExecutorAllocationManager.manageAllocation is called
@@ -101,25 +100,29 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
       }
 
       // Batch proc time slightly more than the scale up ratio, should increase allocation by 1
-      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT + 1) {
+      addBatchProcTimeAndVerifyAllocation(
+        batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) {
         verifyTotalRequestedExecs(Some(3))
         verifyKilledExec(None)
       }
 
       // Batch proc time slightly less than the scale up ratio, should not change allocation
-      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_UP_RATIO_DEFAULT - 1) {
+      addBatchProcTimeAndVerifyAllocation(
+        batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) {
         verifyTotalRequestedExecs(None)
         verifyKilledExec(None)
       }
 
       // Batch proc time slightly more than the scale down ratio, should not change allocation
-      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT + 1) {
+      addBatchProcTimeAndVerifyAllocation(
+        batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) {
         verifyTotalRequestedExecs(None)
         verifyKilledExec(None)
       }
 
       // Batch proc time slightly more than the scale down ratio, should not change allocation
-      addBatchProcTimeAndVerifyAllocation(batchDurationMillis * SCALING_DOWN_RATIO_DEFAULT - 1) {
+      addBatchProcTimeAndVerifyAllocation(
+        batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) {
         verifyTotalRequestedExecs(None)
         verifyKilledExec(Some("2"))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org