You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2016/04/04 22:52:29 UTC

[GitHub] spark pull request: Streaming dynamic allocation

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/12154

    Streaming dynamic allocation

    ## What changes were proposed in this pull request?
    
    Added a new Executor Allocation Manager for the Streaming scheduler for doing Streaming Dynamic Allocation.
    
    ## How was this patch tested
    Unit tests, and cluster tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark streaming-dynamic-allocation

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12154.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12154
    
----
commit 26c0c18d6a411171be36049bec8631b670c9cba4
Author: Tathagata Das <ta...@gmail.com>
Date:   2015-12-04T01:46:14Z

    Implement streaming dynamic allocation

commit 0e78bd2dea8a1f510e396ba9daf14177eb90b539
Author: Tathagata Das <ta...@gmail.com>
Date:   2016-03-16T01:42:59Z

    Some change

commit 60b7a2262664fc8ef6c4aa394e2e246f3fd8cb2d
Author: Tathagata Das <ta...@gmail.com>
Date:   2016-04-04T09:14:17Z

    Fixed bugs and updated tests

commit 39ed35a85224cdb73414a33a5a6f352e3ae221ea
Author: Tathagata Das <ta...@gmail.com>
Date:   2016-04-04T20:28:56Z

    Added enabling key and unit tests

commit 81ad1dd3ce7a87f81f317ff330cca215e8d4fc3a
Author: Tathagata Das <ta...@gmail.com>
Date:   2016-04-04T20:50:56Z

    Added docs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58637184
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +
    +    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
    +      val execIdsWithReceivers = receiverTracker.getAllocatedExecutors.values.flatten.toSeq
    +      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
    +
    +      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
    +      logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
    +      if (removableExecIds.nonEmpty) {
    +        val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
    +        client.killExecutor(execIdToRemove)
    +        logInfo(s"Requested to kill executor $execIdToRemove")
    +      } else {
    +        logInfo(s"No non-receiver executors to kill")
    +      }
    +    } else {
    +      logInfo("No available executor to kill")
    +    }
    +  }
    +
    +  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    +    batchProcTimeSum += timeMs
    +    batchProcTimeCount += 1
    +    logDebug(
    +      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
    +  }
    +
    +  private def validateSettings(): Unit = {
    +    require(
    +      scalingIntervalSecs > 0,
    +      s"Config $SCALING_INTERVAL_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > 0,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
    +
    +    require(
    +      scalingDownRatio > 0,
    +      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
    +
    +    require(
    +      minNumExecutors > 0,
    +      s"Config $MIN_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      maxNumExecutors > 0,
    +      s"$MAX_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > scalingDownRatio,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
    +
    +    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
    +      require(
    +        maxNumExecutors >= minNumExecutors,
    +        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
    +    }
    +  }
    +
    +  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    +    logDebug("onBatchCompleted called: " + batchCompleted)
    +    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
    +      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    +    }
    +  }
    +}
    +
    +private[streaming] object ExecutorAllocationManager extends Logging {
    +  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
    +
    +  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
    +  val SCALING_INTERVAL_DEFAULT_SECS = 60
    +
    +  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
    +  val SCALING_UP_RATIO_DEFAULT = 0.9
    +
    +  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
    +  val SCALING_DOWN_RATIO_DEFAULT = 0.3
    +
    +  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
    +
    +  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
    --- End diff --
    
    Can we use these two configurations `minExecutors` and `maxExecutors` derived from Spark `ExecutorAllocationManager`?
    
    Basically is there any semantic difference for min and max executors between here and Spark's dynamic allocation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206568667
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206567947
  
    **[Test build #55136 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55136/consoleFull)** for PR 12154 at commit [`0598c85`](https://github.com/apache/spark/commit/0598c850b374384d9b45a6707817b39bde279db9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58496446
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    --- End diff --
    
    might be easier to read if it's
    ```
    val targetTotalExecutors =
      math.max(minNumExecutors,
        math.min(maxNumExecutors, allExecIds.size + numNewExecutors))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-XXX][STREAMING] Streaming dynamic alloc...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205493533
  
    12133


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #12154: [SPARK-12133][STREAMING] Streaming dynamic alloca...

Posted by sansagara <gi...@git.apache.org>.
Github user sansagara commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r177898973
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +
    +    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
    +      val execIdsWithReceivers = receiverTracker.getAllocatedExecutors.values.flatten.toSeq
    +      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
    +
    +      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
    +      logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
    +      if (removableExecIds.nonEmpty) {
    +        val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
    +        client.killExecutor(execIdToRemove)
    +        logInfo(s"Requested to kill executor $execIdToRemove")
    +      } else {
    +        logInfo(s"No non-receiver executors to kill")
    +      }
    +    } else {
    +      logInfo("No available executor to kill")
    +    }
    +  }
    +
    +  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    +    batchProcTimeSum += timeMs
    +    batchProcTimeCount += 1
    +    logDebug(
    +      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
    +  }
    +
    +  private def validateSettings(): Unit = {
    +    require(
    +      scalingIntervalSecs > 0,
    +      s"Config $SCALING_INTERVAL_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > 0,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
    +
    +    require(
    +      scalingDownRatio > 0,
    +      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
    +
    +    require(
    +      minNumExecutors > 0,
    +      s"Config $MIN_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      maxNumExecutors > 0,
    +      s"$MAX_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > scalingDownRatio,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
    +
    +    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
    +      require(
    +        maxNumExecutors >= minNumExecutors,
    +        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
    +    }
    +  }
    +
    +  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    +    logDebug("onBatchCompleted called: " + batchCompleted)
    +    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
    +      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    +    }
    +  }
    +}
    +
    +private[streaming] object ExecutorAllocationManager extends Logging {
    +  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
    +
    +  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
    +  val SCALING_INTERVAL_DEFAULT_SECS = 60
    +
    +  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
    +  val SCALING_UP_RATIO_DEFAULT = 0.9
    +
    +  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
    +  val SCALING_DOWN_RATIO_DEFAULT = 0.3
    +
    +  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
    +
    +  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
    --- End diff --
    
    @tdas @andrewor14 I also have to ask: Any reason `initExecutors ` is not supported for streaming with dynamic allocation? I'm having issues with my application because it needs a minimum executors count to start behaving good with the Kinesis stream.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #12154: [SPARK-12133][STREAMING] Streaming dynamic allocation

Posted by sansagara <gi...@git.apache.org>.
Github user sansagara commented on the issue:

    https://github.com/apache/spark/pull/12154
  
    Is there a way to specify the Initial executors?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58461720
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.StreamingContextState._
     import org.apache.spark.streaming.dstream._
     import org.apache.spark.streaming.receiver.Receiver
    -import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
    +import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
    --- End diff --
    
    I just pushed some more changes. Its now needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206513739
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205532470
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54893/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206601300
  
    Merged into master thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58758160
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    --- End diff --
    
    there is some approximation here. I am assuming that the scaling interval will be high enough that each readjustment will not overlap with each other. we can make this more advanced in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-XXX][STREAMING] Streaming dynamic alloc...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205492191
  
    @andrewor14 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58757206
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58454756
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---
    @@ -93,6 +103,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
           receiverTracker.stop(processAllReceivedData)
         }
     
    +    if (executorAllocationManager != null) {
    --- End diff --
    
    This null check seems unecessary since its initialized as None rather than null and - did you mean initialize executorAllocationManager as null to match the others or would this maybe get set to null elsewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206572694
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205564115
  
    **[Test build #54905 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54905/consoleFull)** for PR 12154 at commit [`0c6d94b`](https://github.com/apache/spark/commit/0c6d94b54ff26ee58f5eb2c240768b51dd2227f2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-212538540
  
    @jayv big new features like this are never backported into older branches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58759097
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +
    +    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
    +      val execIdsWithReceivers = receiverTracker.getAllocatedExecutors.values.flatten.toSeq
    +      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
    +
    +      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
    +      logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
    +      if (removableExecIds.nonEmpty) {
    +        val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
    +        client.killExecutor(execIdToRemove)
    +        logInfo(s"Requested to kill executor $execIdToRemove")
    +      } else {
    +        logInfo(s"No non-receiver executors to kill")
    +      }
    +    } else {
    +      logInfo("No available executor to kill")
    +    }
    +  }
    +
    +  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    +    batchProcTimeSum += timeMs
    +    batchProcTimeCount += 1
    +    logDebug(
    +      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
    +  }
    +
    +  private def validateSettings(): Unit = {
    +    require(
    +      scalingIntervalSecs > 0,
    +      s"Config $SCALING_INTERVAL_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > 0,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
    +
    +    require(
    +      scalingDownRatio > 0,
    +      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
    +
    +    require(
    +      minNumExecutors > 0,
    +      s"Config $MIN_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      maxNumExecutors > 0,
    +      s"$MAX_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > scalingDownRatio,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
    +
    +    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
    +      require(
    +        maxNumExecutors >= minNumExecutors,
    +        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
    +    }
    +  }
    +
    +  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    +    logDebug("onBatchCompleted called: " + batchCompleted)
    +    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
    +      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    +    }
    +  }
    +}
    +
    +private[streaming] object ExecutorAllocationManager extends Logging {
    +  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
    +
    +  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
    +  val SCALING_INTERVAL_DEFAULT_SECS = 60
    +
    +  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
    +  val SCALING_UP_RATIO_DEFAULT = 0.9
    +
    +  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
    +  val SCALING_DOWN_RATIO_DEFAULT = 0.3
    +
    +  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
    +
    +  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
    --- End diff --
    
    It is very confusing if configs inside `spark.streaming.dynamicAllocation.*` depends on configs in `spark.dynamicAllocation.*`. Very non intuitive and defeats the whole purpose of having config names be scoped with `.`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by jayv <gi...@git.apache.org>.
Github user jayv commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-212279971
  
    @tdas or @andrewor14 does this depend on any 2.0 APIs, I would like to backport this to 1.5 or 1.6 if possible.
    
    need to run multiple concurrent streaming jobs on mesos


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205566028
  
    **[Test build #2750 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2750/consoleFull)** for PR 12154 at commit [`0c6d94b`](https://github.com/apache/spark/commit/0c6d94b54ff26ee58f5eb2c240768b51dd2227f2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58461884
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---
    @@ -93,6 +103,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
           receiverTracker.stop(processAllReceivedData)
         }
     
    +    if (executorAllocationManager != null) {
    --- End diff --
    
    Its not supposed to be used as null, but since its var, just avoiding unnecessary NPEs in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205527443
  
    **[Test build #54905 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54905/consoleFull)** for PR 12154 at commit [`0c6d94b`](https://github.com/apache/spark/commit/0c6d94b54ff26ee58f5eb2c240768b51dd2227f2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58496003
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    --- End diff --
    
    can you expand on this java doc to comment on how this is different from the core dynamic allocation? We should mention that this intends to stabilize the system over time gradually by requesting and killing executors 1 at a time. Bonus points if you add a paragraph on how well this works with backpressure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #12154: [SPARK-12133][STREAMING] Streaming dynamic alloca...

Posted by alunarbeach <gi...@git.apache.org>.
Github user alunarbeach commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r99911965
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +
    +    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
    +      val execIdsWithReceivers = receiverTracker.getAllocatedExecutors.values.flatten.toSeq
    +      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
    +
    +      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
    +      logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
    +      if (removableExecIds.nonEmpty) {
    +        val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
    +        client.killExecutor(execIdToRemove)
    +        logInfo(s"Requested to kill executor $execIdToRemove")
    +      } else {
    +        logInfo(s"No non-receiver executors to kill")
    +      }
    +    } else {
    +      logInfo("No available executor to kill")
    +    }
    +  }
    +
    +  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    +    batchProcTimeSum += timeMs
    +    batchProcTimeCount += 1
    +    logDebug(
    +      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
    +  }
    +
    +  private def validateSettings(): Unit = {
    +    require(
    +      scalingIntervalSecs > 0,
    +      s"Config $SCALING_INTERVAL_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > 0,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
    +
    +    require(
    +      scalingDownRatio > 0,
    +      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
    +
    +    require(
    +      minNumExecutors > 0,
    +      s"Config $MIN_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      maxNumExecutors > 0,
    +      s"$MAX_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > scalingDownRatio,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
    +
    +    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
    +      require(
    +        maxNumExecutors >= minNumExecutors,
    +        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
    +    }
    +  }
    +
    +  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    +    logDebug("onBatchCompleted called: " + batchCompleted)
    +    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
    +      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    +    }
    +  }
    +}
    +
    +private[streaming] object ExecutorAllocationManager extends Logging {
    +  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
    +
    +  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
    +  val SCALING_INTERVAL_DEFAULT_SECS = 60
    +
    +  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
    +  val SCALING_UP_RATIO_DEFAULT = 0.9
    +
    +  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
    +  val SCALING_DOWN_RATIO_DEFAULT = 0.3
    +
    +  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
    +
    +  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
    --- End diff --
    
    @tdas Is there any particular reason, why initExecutors is not supported in streaming.dynamicAllocation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58496898
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +
    +    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
    +      val execIdsWithReceivers = receiverTracker.getAllocatedExecutors.values.flatten.toSeq
    +      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
    +
    +      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
    +      logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
    +      if (removableExecIds.nonEmpty) {
    +        val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
    +        client.killExecutor(execIdToRemove)
    +        logInfo(s"Requested to kill executor $execIdToRemove")
    +      } else {
    +        logInfo(s"No non-receiver executors to kill")
    +      }
    +    } else {
    +      logInfo("No available executor to kill")
    +    }
    +  }
    +
    +  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    +    batchProcTimeSum += timeMs
    +    batchProcTimeCount += 1
    +    logDebug(
    +      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
    +  }
    +
    +  private def validateSettings(): Unit = {
    +    require(
    +      scalingIntervalSecs > 0,
    +      s"Config $SCALING_INTERVAL_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > 0,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
    +
    +    require(
    +      scalingDownRatio > 0,
    +      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
    +
    +    require(
    +      minNumExecutors > 0,
    +      s"Config $MIN_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      maxNumExecutors > 0,
    +      s"$MAX_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > scalingDownRatio,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
    +
    +    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
    +      require(
    +        maxNumExecutors >= minNumExecutors,
    +        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
    +    }
    +  }
    +
    +  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    +    logDebug("onBatchCompleted called: " + batchCompleted)
    +    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
    +      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    +    }
    +  }
    +}
    +
    +private[streaming] object ExecutorAllocationManager extends Logging {
    +  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
    +
    +  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
    +  val SCALING_INTERVAL_DEFAULT_SECS = 60
    +
    +  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
    +  val SCALING_UP_RATIO_DEFAULT = 0.9
    +
    +  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
    +  val SCALING_DOWN_RATIO_DEFAULT = 0.3
    +
    +  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
    +
    +  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
    +
    +  def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
    +    val numExecutor = conf.getInt("spark.executor.instances", 0)
    +    val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
    +    if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
    +      throw new IllegalArgumentException(
    +        "Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
    --- End diff --
    
    Should this return false? Doesn't throwing an exception crash the driver, so it doesn't matter whether "dynamic allocation is disabled"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205532461
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58453993
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.StreamingContextState._
     import org.apache.spark.streaming.dstream._
     import org.apache.spark.streaming.receiver.Receiver
    -import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
    +import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
    --- End diff --
    
    Is the `ExecutorAllocationManager` import necessary? It doesn't seem to be referenced here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58756641
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         listenerBus.addListener(listener)
       }
     
    +  private[spark] override def getExecutorIds(): Seq[String] = {
    --- End diff --
    
    Changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58756912
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206508408
  
    **[Test build #2759 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2759/consoleFull)** for PR 12154 at commit [`3b501a0`](https://github.com/apache/spark/commit/3b501a00bfb48a99a9d763a62ae20f8e4e3e6f46).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206509080
  
    **[Test build #55136 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55136/consoleFull)** for PR 12154 at commit [`0598c85`](https://github.com/apache/spark/commit/0598c850b374384d9b45a6707817b39bde279db9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206572700
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/55140/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/12154


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by jayv <gi...@git.apache.org>.
Github user jayv commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-212539770
  
    I understand that, but I want to port this feature to our internal custom 1.6 build, if it's not too much trouble.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206524873
  
    **[Test build #55140 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55140/consoleFull)** for PR 12154 at commit [`ce36c76`](https://github.com/apache/spark/commit/ce36c76fac5a8f7d9bafc6f53a266803b548e53f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58758723
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -234,6 +236,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         }
       }
     
    +  def getAllocatedExecutors(): Map[Int, Option[String]] = {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205699344
  
    @tdas Looks great. I think you could add more comments in the code but the rest is pretty good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206508255
  
    @andrewor14 Updated. Please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205564485
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58762264
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,233 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. This is different from the core
    + * dynamic allocation policy; the core policy relies on executors being idle for a while, but the
    + * micro-batch model of streaming prevents any particular executors from being idle for a long
    + * time. Instead, the measure of "idle-ness" needs to be based on the time taken to process
    + * each batch.
    + *
    + * At a high level, the policy implemented by this class is as follows:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors.
    + *   The number of executors requested is based on the ratio = (avg. proc. time / batch interval).
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver.
    + *
    + * This features should ideally be used in conjunction with backpressure, as backpressure ensures
    + * system stability, while executors are being readjusted.
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  /**
    +   * Manage executor allocation by requesting or killing executors based on the collected
    +   * batch statistics.
    +   */
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  /** Request the specified number of executors over the currently active one */
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  /** Kill a executors that is not running a receiver */
    --- End diff --
    
    grammar


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58496298
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    --- End diff --
    
    can you add like a 1 line comment on these methods. How does this decide which / how many executors to kill? What about receivers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205564487
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/54905/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58496601
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    --- End diff --
    
    do we need to take into account pending executors? What about those pending to be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58497305
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -234,6 +236,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         }
       }
     
    +  def getAllocatedExecutors(): Map[Int, Option[String]] = {
    --- End diff --
    
    need to document and the int and string represent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58762163
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -100,12 +115,13 @@ private[streaming] class ExecutorAllocationManager(
         logInfo(s"Requested total $targetTotalExecutors executors")
       }
     
    +  /** Kill a executors that is not running a receiver */
    --- End diff --
    
    grammar


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205609386
  
    **[Test build #2750 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2750/consoleFull)** for PR 12154 at commit [`0c6d94b`](https://github.com/apache/spark/commit/0c6d94b54ff26ee58f5eb2c240768b51dd2227f2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206572042
  
    **[Test build #55140 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/55140/consoleFull)** for PR 12154 at commit [`ce36c76`](https://github.com/apache/spark/commit/ce36c76fac5a8f7d9bafc6f53a266803b548e53f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by Colin-Yu <gi...@git.apache.org>.
Github user Colin-Yu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58483960
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -1360,6 +1360,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         listenerBus.addListener(listener)
       }
     
    +  private[spark] override def getExecutorIds(): Seq[String] = {
    --- End diff --
    
    A newbie question: if a method has no side effect and return values, do code standard in spark suggest to remove parenthesis in method declaration?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #12154: [SPARK-12133][STREAMING] Streaming dynamic allocation

Posted by sugix <gi...@git.apache.org>.
Github user sugix commented on the issue:

    https://github.com/apache/spark/pull/12154
  
    @tdas - Why we cannot see this in the documentation and I am not sure if AWS EMR supports this feature? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58497208
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---
    @@ -93,6 +103,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
           receiverTracker.stop(processAllReceivedData)
         }
     
    +    if (executorAllocationManager != null) {
    --- End diff --
    
    I agree with Holden. I don't think it's possible for it to be null. The only chance that could happen is if you call `stop()` in the constructor before declaring all the variables, which is highly improbable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-XXX][STREAMING] Streaming dynamic alloc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205493606
  
    **[Test build #54893 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54893/consoleFull)** for PR 12154 at commit [`81ad1dd`](https://github.com/apache/spark/commit/81ad1dd3ce7a87f81f317ff330cca215e8d4fc3a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-205531859
  
    **[Test build #54893 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/54893/consoleFull)** for PR 12154 at commit [`81ad1dd`](https://github.com/apache/spark/commit/81ad1dd3ce7a87f81f317ff330cca215e8d4fc3a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-206568674
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/55136/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58758346
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    + * - If (avg. proc. time / batch interval) <= scaling down ratio, then try to kill a executor that
    + *   is not running a receiver
    + */
    +private[streaming] class ExecutorAllocationManager(
    +    client: ExecutorAllocationClient,
    +    receiverTracker: ReceiverTracker,
    +    conf: SparkConf,
    +    batchDurationMs: Long,
    +    clock: Clock) extends StreamingListener with Logging {
    +
    +  import ExecutorAllocationManager._
    +
    +  private val scalingIntervalSecs = conf.getTimeAsSeconds(
    +    SCALING_INTERVAL_KEY,
    +    s"${SCALING_INTERVAL_DEFAULT_SECS}s")
    +  private val scalingUpRatio = conf.getDouble(SCALING_UP_RATIO_KEY, SCALING_UP_RATIO_DEFAULT)
    +  private val scalingDownRatio = conf.getDouble(SCALING_DOWN_RATIO_KEY, SCALING_DOWN_RATIO_DEFAULT)
    +  private val minNumExecutors = conf.getInt(
    +    MIN_EXECUTORS_KEY,
    +    math.max(1, receiverTracker.numReceivers))
    +  private val maxNumExecutors = conf.getInt(MAX_EXECUTORS_KEY, Integer.MAX_VALUE)
    +  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    +    _ => manageAllocation(), "streaming-executor-allocation-manager")
    +
    +  @volatile private var batchProcTimeSum = 0L
    +  @volatile private var batchProcTimeCount = 0
    +
    +  validateSettings()
    +
    +  def start(): Unit = {
    +    timer.start()
    +    logInfo(s"ExecutorAllocationManager started with " +
    +      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
    +  }
    +
    +  def stop(): Unit = {
    +    timer.stop(interruptTimer = true)
    +    logInfo("ExecutorAllocationManager stopped")
    +  }
    +
    +  private def manageAllocation(): Unit = synchronized {
    +    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    +    if (batchProcTimeCount > 0) {
    +      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
    +      val ratio = averageBatchProcTime.toDouble / batchDurationMs
    +      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
    +      if (ratio >= scalingUpRatio) {
    +        logDebug("Requesting executors")
    +        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
    +        requestExecutors(numNewExecutors)
    +      } else if (ratio <= scalingDownRatio) {
    +        logDebug("Killing executors")
    +        killExecutor()
    +      }
    +    }
    +    batchProcTimeSum = 0
    +    batchProcTimeCount = 0
    +  }
    +
    +  private def requestExecutors(numNewExecutors: Int): Unit = {
    +    require(numNewExecutors >= 1)
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +    val targetTotalExecutors =
    +      math.max(math.min(maxNumExecutors, allExecIds.size + numNewExecutors), minNumExecutors)
    +    client.requestTotalExecutors(targetTotalExecutors, 0, Map.empty)
    +    logInfo(s"Requested total $targetTotalExecutors executors")
    +  }
    +
    +  private def killExecutor(): Unit = {
    +    val allExecIds = client.getExecutorIds()
    +    logDebug(s"Executors (${allExecIds.size}) = ${allExecIds}")
    +
    +    if (allExecIds.nonEmpty && allExecIds.size > minNumExecutors) {
    +      val execIdsWithReceivers = receiverTracker.getAllocatedExecutors.values.flatten.toSeq
    +      logInfo(s"Executors with receivers (${execIdsWithReceivers.size}): ${execIdsWithReceivers}")
    +
    +      val removableExecIds = allExecIds.diff(execIdsWithReceivers)
    +      logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}")
    +      if (removableExecIds.nonEmpty) {
    +        val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size))
    +        client.killExecutor(execIdToRemove)
    +        logInfo(s"Requested to kill executor $execIdToRemove")
    +      } else {
    +        logInfo(s"No non-receiver executors to kill")
    +      }
    +    } else {
    +      logInfo("No available executor to kill")
    +    }
    +  }
    +
    +  private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    +    batchProcTimeSum += timeMs
    +    batchProcTimeCount += 1
    +    logDebug(
    +      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
    +  }
    +
    +  private def validateSettings(): Unit = {
    +    require(
    +      scalingIntervalSecs > 0,
    +      s"Config $SCALING_INTERVAL_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > 0,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than 0")
    +
    +    require(
    +      scalingDownRatio > 0,
    +      s"Config $SCALING_DOWN_RATIO_KEY must be more than 0")
    +
    +    require(
    +      minNumExecutors > 0,
    +      s"Config $MIN_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      maxNumExecutors > 0,
    +      s"$MAX_EXECUTORS_KEY must be more than 0")
    +
    +    require(
    +      scalingUpRatio > scalingDownRatio,
    +      s"Config $SCALING_UP_RATIO_KEY must be more than config $SCALING_DOWN_RATIO_KEY")
    +
    +    if (conf.contains(MIN_EXECUTORS_KEY) && conf.contains(MAX_EXECUTORS_KEY)) {
    +      require(
    +        maxNumExecutors >= minNumExecutors,
    +        s"Config $MAX_EXECUTORS_KEY must be more than config $MIN_EXECUTORS_KEY")
    +    }
    +  }
    +
    +  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    +    logDebug("onBatchCompleted called: " + batchCompleted)
    +    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
    +      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    +    }
    +  }
    +}
    +
    +private[streaming] object ExecutorAllocationManager extends Logging {
    +  val ENABLED_KEY = "spark.streaming.dynamicAllocation.enabled"
    +
    +  val SCALING_INTERVAL_KEY = "spark.streaming.dynamicAllocation.scalingInterval"
    +  val SCALING_INTERVAL_DEFAULT_SECS = 60
    +
    +  val SCALING_UP_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingUpRatio"
    +  val SCALING_UP_RATIO_DEFAULT = 0.9
    +
    +  val SCALING_DOWN_RATIO_KEY = "spark.streaming.dynamicAllocation.scalingDownRatio"
    +  val SCALING_DOWN_RATIO_DEFAULT = 0.3
    +
    +  val MIN_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.minExecutors"
    +
    +  val MAX_EXECUTORS_KEY = "spark.streaming.dynamicAllocation.maxExecutors"
    +
    +  def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
    +    val numExecutor = conf.getInt("spark.executor.instances", 0)
    +    val streamingDynamicAllocationEnabled = conf.getBoolean(ENABLED_KEY, false)
    +    if (numExecutor != 0 && streamingDynamicAllocationEnabled) {
    +      throw new IllegalArgumentException(
    +        "Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
    --- End diff --
    
    illegal configuration should be able to crash the driver, isnt it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12154#discussion_r58495835
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ---
    @@ -0,0 +1,217 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import scala.util.Random
    +
    +import org.apache.spark.{ExecutorAllocationClient, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.streaming.util.RecurringTimer
    +import org.apache.spark.util.{Clock, Utils}
    +
    +/**
    + * Class that manages executor allocated to a StreamingContext, and dynamically request or kill
    + * executors based on the statistics of the streaming computation. At a high level, the policy is:
    + * - Use StreamingListener interface get batch processing times of completed batches
    + * - Periodically take the average batch completion times and compare with the batch interval
    + * - If (avg. proc. time / batch interval) >= scaling up ratio, then request more executors
    --- End diff --
    
    more specifically, request at most 1 right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12133][STREAMING] Streaming dynamic all...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/12154#issuecomment-212602051
  
    I see. I don't believe this depends on new APIs. You may have some difficulty just backporting into 1.6 in general for big patches, however.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org