You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dhruve <gi...@git.apache.org> on 2017/08/15 15:01:21 UTC

[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

GitHub user dhruve opened a pull request:

    https://github.com/apache/spark/pull/18950

    [SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group

    ## What changes were proposed in this pull request?
    This change allows the user to specify the maximum no. of tasks running in a given job group. (Kindly see the jira comments section for more context on why this is implemented at a job group level rather than a stage level). This change is beneficial where the user wants to avoid having a DoS while trying to access an eternal service from multiple executors without having the need to repartition or coalesce existing RDDs.
    
    This code change introduces a new user level configuration: `spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active no. of tasks executing at a given point in time.
    
    The user can use the feature by setting the appropriate jobGroup and passing the conf:
    
    `conf.set("spark.job.group1.maxConcurrentTasks", "10")`
    `...`
    `sc.setJobGroup("group1", "", false)`
    `sc.parallelize(1 to 100000, 10).map(x => x + 1).count`
    `sc.clearJobGroup`
    
    `
    
    #### changes proposed in this fix 
    This change limits the no. of tasks (in turn also the no. of executors to be acquired) than can run simultaneously in a given job group and its subsequent job/s and stage/s if the appropriate job group and max concurrency configs are set.
    
    ## How was this patch tested?
    Ran unit tests and multiple manual tests with various combinations of:
    - single/multiple/no job groups
    - executors with single/multi cores
    - dynamic allocation on/off


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18950
    
----
commit 824396c82977171c38ab5d7f6c0f84bc19eccaba
Author: Dhruve Ashar <dh...@gmail.com>
Date:   2017-08-15T14:18:21Z

    [SPARK-20589] Allow limiting task concurrency per stage

commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923
Author: Dhruve Ashar <dh...@gmail.com>
Date:   2017-08-15T14:45:18Z

    Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #81427 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81427/testReport)** for PR 18950 at commit [`8b38300`](https://github.com/apache/spark/commit/8b3830004d69bd5f109fd9846f59583c23a910c7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #81464 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81464/testReport)** for PR 18950 at commit [`8b38300`](https://github.com/apache/spark/commit/8b3830004d69bd5f109fd9846f59583c23a910c7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Jenkins, test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133502797
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -314,7 +316,7 @@ private[spark] class ExecutorAllocationManager(
           // Do not change our target while we are still initializing,
           // Otherwise the first job may have to ramp up unnecessarily
           0
    -    } else if (maxNeeded < numExecutorsTarget) {
    +    } else if (maxNeeded <= numExecutorsTarget) {
           // The target number exceeds the number we actually need, so stop adding new
    --- End diff --
    
    yeah I don't think this change hurts, but unless you have a reason for changing it, I'd prefer to leave it just for when we need to dig through history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80690/testReport)** for PR 18950 at commit [`d3f8162`](https://github.com/apache/spark/commit/d3f8162dab4ca7065d7f296fd03528ce6ddfb923).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133344532
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      val jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        ""
    +      }
    +      val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
    +        Int.MaxValue)
    +
    +      logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks")
    +      allocationManager.synchronized {
    +        allocationManager.maxConcurrentTasks = maxConcurrentTasks
    --- End diff --
    
    Ummm... what? It is entirely possible to set a job group, spawn a bunch of threads that will eventually create jobs in that job group, then set another job group and spawn more threads that will be creating jobs in this new group simultaneously with jobs being created in the prior group.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
GitHub user dhruve reopened a pull request:

    https://github.com/apache/spark/pull/18950

    [SPARK-20589][Core][Scheduler] Allow limiting task concurrency per job group

    ## What changes were proposed in this pull request?
    This change allows the user to specify the maximum no. of tasks running in a given job group. (Kindly see the jira comments section for more context on why this is implemented at a job group level rather than a stage level). This change is beneficial where the user wants to avoid having a DoS while trying to access an eternal service from multiple executors without having the need to repartition or coalesce existing RDDs.
    
    This code change introduces a new user level configuration: `spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active no. of tasks executing at a given point in time.
    
    The user can use the feature by setting the appropriate jobGroup and passing the conf:
    
    ```
    conf.set("spark.job.group1.maxConcurrentTasks", "10")
    ...
    sc.setJobGroup("group1", "", false)
    sc.parallelize(1 to 100000, 10).map(x => x + 1).count
    sc.clearJobGroup
    ```
    
    #### changes proposed in this fix 
    This change limits the no. of tasks (in turn also the no. of executors to be acquired) than can run simultaneously in a given job group and its subsequent job/s and stage/s if the appropriate job group and max concurrency configs are set.
    
    ## How was this patch tested?
    Ran unit tests and multiple manual tests with various combinations of:
    - single/multiple/no job groups
    - executors with single/multi cores
    - dynamic allocation on/off


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18950
    
----
commit 824396c82977171c38ab5d7f6c0f84bc19eccaba
Author: Dhruve Ashar <dh...@gmail.com>
Date:   2017-08-15T14:18:21Z

    [SPARK-20589] Allow limiting task concurrency per stage

commit d3f8162dab4ca7065d7f296fd03528ce6ddfb923
Author: Dhruve Ashar <dh...@gmail.com>
Date:   2017-08-15T14:45:18Z

    Merge branch 'master' of github.com:apache/spark into impr/SPARK-20589

commit 824621286ffb107010409c4d0d3442550628247d
Author: Dhruve Ashar <dh...@gmail.com>
Date:   2017-08-21T16:51:41Z

    Allow limiting task concurrency per stage in concurrent job groups

commit 517acb490ae5938a22c4175347f6bbc24b47781f
Author: Dhruve Ashar <dh...@gmail.com>
Date:   2017-08-21T19:30:17Z

    Remove comment

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80938/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134501736
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -724,6 +777,62 @@ private[spark] class ExecutorAllocationManager(
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1)))
    --- End diff --
    
    I think this needs a comment explaining why you need to look at stages at all -- its not obvious why its necessary.  (at first I was going to suggest the number of tasks left in a stage shouldn't matter, but then realized that it could in some scenarios)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80990 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80990/testReport)** for PR 18950 at commit [`0e518f0`](https://github.com/apache/spark/commit/0e518f00ce97fd5d17fe89792c2503d2514b0473).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134501323
  
    --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
    @@ -188,10 +192,195 @@ class ExecutorAllocationManagerSuite
         assert(numExecutorsTarget(manager) === 10)
       }
     
    +  test("add executors capped by max concurrent tasks for a job group with single core executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the stage
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    // Submit another stage in the same job
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties))
    --- End diff --
    
    nit: still a few `Seq{}` you missed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133505348
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -1214,6 +1214,101 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
         verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
       }
     
    +  test("limit max concurrent running tasks in a job group when configured ") {
    +    val conf = new SparkConf().
    +      set(config.BLACKLIST_ENABLED, true).
    +      set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2
    +
    +    sc = new SparkContext("local", "test", conf)
    +    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
    +    val props = new Properties();
    +    props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group
    +
    +    val tasks = Array.tabulate[Task[_]](10) { i =>
    +      new FakeTask(0, i, Nil)
    +    }
    +    val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2)
    +
    +    // make some offers to our taskset
    +    var taskDescs = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      // offer each executor twice (simulating 2 cores per executor)
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)}
    +    }
    +    assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks.
    +
    +    // make 4 more offers
    +    val taskDescs2 = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)}
    +    }
    +    assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks
    +
    +    // inform tsm that one task has completed
    +    val directTaskResult = new DirectTaskResult[String](null, Seq()) {
    --- End diff --
    
    you can use `createTaskResult`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81464/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    @dhruve looks like a real test failure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134498890
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -724,6 +777,62 @@ private[spark] class ExecutorAllocationManager(
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1)))
    +
    +      def getMaxConTasks(maxConTasks: Int,
    +        stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = {
    --- End diff --
    
    style nit: with multi-line method definitions, each param goes on its own line, double-indented:
    
    ```scala
    def getMaxConTasks(
        maxConTasks: Int,
        stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133547780
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -454,64 +477,68 @@ private[spark] class TaskSetManager(
             }
           }
     
    -      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
    -        // Found a task; do some bookkeeping and return a task description
    -        val task = tasks(index)
    -        val taskId = sched.newTaskId()
    -        // Do various bookkeeping
    -        copiesRunning(index) += 1
    -        val attemptNum = taskAttempts(index).size
    -        val info = new TaskInfo(taskId, index, attemptNum, curTime,
    -          execId, host, taskLocality, speculative)
    -        taskInfos(taskId) = info
    -        taskAttempts(index) = info :: taskAttempts(index)
    -        // Update our locality level for delay scheduling
    -        // NO_PREF will not affect the variables related to delay scheduling
    -        if (maxLocality != TaskLocality.NO_PREF) {
    -          currentLocalityIndex = getLocalityIndex(taskLocality)
    -          lastLaunchTime = curTime
    -        }
    -        // Serialize and return the task
    -        val serializedTask: ByteBuffer = try {
    -          ser.serialize(task)
    -        } catch {
    -          // If the task cannot be serialized, then there's no point to re-attempt the task,
    -          // as it will always fail. So just abort the whole task-set.
    -          case NonFatal(e) =>
    -            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
    -            logError(msg, e)
    -            abort(s"$msg Exception during serialization: $e")
    -            throw new TaskNotSerializableException(e)
    -        }
    -        if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
    -          !emittedTaskSizeWarning) {
    -          emittedTaskSizeWarning = true
    -          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
    -            s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
    -            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
    -        }
    -        addRunningTask(taskId)
    -
    -        // We used to log the time it takes to serialize the task, but task size is already
    -        // a good proxy to task serialization time.
    -        // val timeTaken = clock.getTime() - startTime
    -        val taskName = s"task ${info.id} in stage ${taskSet.id}"
    -        logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
    -          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
    -
    -        sched.dagScheduler.taskStarted(task, info)
    -        new TaskDescription(
    -          taskId,
    -          attemptNum,
    -          execId,
    -          taskName,
    -          index,
    -          addedFiles,
    -          addedJars,
    -          task.localProperties,
    -          serializedTask)
    +      dequeueTask(execId, host, allowedLocality).map {
    --- End diff --
    
    yes. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133504041
  
    --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
    @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite
         assert(numExecutorsTarget(manager) === 10)
       }
     
    +  test("add executors capped by max concurrent tasks for a job group with single core executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stage0 = createStageInfo(0, 10)
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties))
    --- End diff --
    
    nit: parens, not braces: `Seq(stage0)`
    (throughout this test)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    @squito @markhamstra I addressed the comments and have made the changes to account for running different job groups concurrently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve closed the pull request at:

    https://github.com/apache/spark/pull/18950


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133311882
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -454,64 +477,68 @@ private[spark] class TaskSetManager(
             }
           }
     
    -      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
    -        // Found a task; do some bookkeeping and return a task description
    -        val task = tasks(index)
    -        val taskId = sched.newTaskId()
    -        // Do various bookkeeping
    -        copiesRunning(index) += 1
    -        val attemptNum = taskAttempts(index).size
    -        val info = new TaskInfo(taskId, index, attemptNum, curTime,
    -          execId, host, taskLocality, speculative)
    -        taskInfos(taskId) = info
    -        taskAttempts(index) = info :: taskAttempts(index)
    -        // Update our locality level for delay scheduling
    -        // NO_PREF will not affect the variables related to delay scheduling
    -        if (maxLocality != TaskLocality.NO_PREF) {
    -          currentLocalityIndex = getLocalityIndex(taskLocality)
    -          lastLaunchTime = curTime
    -        }
    -        // Serialize and return the task
    -        val serializedTask: ByteBuffer = try {
    -          ser.serialize(task)
    -        } catch {
    -          // If the task cannot be serialized, then there's no point to re-attempt the task,
    -          // as it will always fail. So just abort the whole task-set.
    -          case NonFatal(e) =>
    -            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
    -            logError(msg, e)
    -            abort(s"$msg Exception during serialization: $e")
    -            throw new TaskNotSerializableException(e)
    -        }
    -        if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
    -          !emittedTaskSizeWarning) {
    -          emittedTaskSizeWarning = true
    -          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
    -            s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
    -            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
    -        }
    -        addRunningTask(taskId)
    -
    -        // We used to log the time it takes to serialize the task, but task size is already
    -        // a good proxy to task serialization time.
    -        // val timeTaken = clock.getTime() - startTime
    -        val taskName = s"task ${info.id} in stage ${taskSet.id}"
    -        logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
    -          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
    -
    -        sched.dagScheduler.taskStarted(task, info)
    -        new TaskDescription(
    -          taskId,
    -          attemptNum,
    -          execId,
    -          taskName,
    -          index,
    -          addedFiles,
    -          addedJars,
    -          task.localProperties,
    -          serializedTask)
    +      dequeueTask(execId, host, allowedLocality).map {
    --- End diff --
    
    unintentional reformat?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134579683
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager(
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      // We can limit the no. of concurrent tasks by a job group and multiple jobs can run with
    --- End diff --
    
    by a job group.  A job group can have multiple jobs with multiple stages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80990 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80990/testReport)** for PR 18950 at commit [`0e518f0`](https://github.com/apache/spark/commit/0e518f00ce97fd5d17fe89792c2503d2514b0473).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    CI is having issues downloading my repo. Closing this PR and opening a new one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80938 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80938/testReport)** for PR 18950 at commit [`65941f7`](https://github.com/apache/spark/commit/65941f7884551e84a13a6cc2e7488a01e7d8beec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134597897
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -598,13 +600,58 @@ private[spark] class ExecutorAllocationManager(
         private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
         // Number of tasks currently running on the cluster.  Should be 0 when no stages are active.
         private var numRunningTasks: Int = _
    +    private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int]
    +    private val jobIdToJobGroup = new mutable.HashMap[Int, String]
    +    private val stageIdToJobId = new mutable.HashMap[Int, Int]
    +    private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int]
     
         // stageId to tuple (the number of task with locality preferences, a map where each pair is a
         // node and the number of tasks that would like to be scheduled on that node) map,
         // maintain the executor placement hints for each stage Id used by resource framework to better
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
    +
    +      var jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        null
    +      }
    +
    +      val maxConTasks = if (jobGroupId != null &&
    +        conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
    +      } else {
    +        Int.MaxValue
    +      }
    +
    +      if (maxConTasks <= 0) {
    +        throw new IllegalArgumentException(
    +          "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.")
    +      }
    +
    +      if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        jobGroupId = "default-group-" + jobStart.jobId.hashCode
    --- End diff --
    
    seems unlikely user would specify the same name but wonder if we add __ in front of it would be a bit more unique. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80937/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133482787
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -314,7 +316,7 @@ private[spark] class ExecutorAllocationManager(
           // Do not change our target while we are still initializing,
           // Otherwise the first job may have to ramp up unnecessarily
           0
    -    } else if (maxNeeded < numExecutorsTarget) {
    +    } else if (maxNeeded <= numExecutorsTarget) {
           // The target number exceeds the number we actually need, so stop adding new
    --- End diff --
    
    I'm curious if we needed the = here?  It doesn't appear that it should be needed but perhaps there is corner case.  I don't think it hurts anything either so more curious if there is an existing bug.
    
    If we do keep the = we should update the comment here for is the number exceeds or is equal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133504438
  
    --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
    @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite
         assert(numExecutorsTarget(manager) === 10)
       }
     
    +  test("add executors capped by max concurrent tasks for a job group with single core executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stage0 = createStageInfo(0, 10)
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the stage
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    // Submit another stage in the same job
    +    val stage1 = createStageInfo(1, 10)
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
    +
    +    // Set another jobGroup
    +    sc.setJobGroup("group2", "", false)
    +
    +    val stage3 = createStageInfo(3, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 5)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
    +
    +    // Clear jobGroup
    +    sc.clearJobGroup()
    +
    +    val stage4 = createStageInfo(4, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 50)
    +  }
    +
    +  test("add executors capped by max concurrent tasks for a job group with multi cores executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +      .set("spark.executor.cores", "3")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stage0 = createStageInfo(0, 10)
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the stage
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    // Submit another stage in the same job
    +    val stage1 = createStageInfo(1, 10)
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
    +
    +    // Set another jobGroup
    +    sc.setJobGroup("group2", "", false)
    +
    +    val stage3 = createStageInfo(3, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
    +
    +    // Clear jobGroup
    +    sc.clearJobGroup()
    +
    +    val stage4 = createStageInfo(4, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 17)
    +  }
    --- End diff --
    
    you should also have a test for multiple concurrent jobs with different maxes.  Be sure to include a case where one is unbounded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133547976
  
    --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
    @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite
         assert(numExecutorsTarget(manager) === 10)
       }
     
    +  test("add executors capped by max concurrent tasks for a job group with single core executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stage0 = createStageInfo(0, 10)
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the stage
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    // Submit another stage in the same job
    +    val stage1 = createStageInfo(1, 10)
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
    +
    +    // Set another jobGroup
    +    sc.setJobGroup("group2", "", false)
    +
    +    val stage3 = createStageInfo(3, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 5)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
    +
    +    // Clear jobGroup
    +    sc.clearJobGroup()
    +
    +    val stage4 = createStageInfo(4, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 50)
    +  }
    +
    +  test("add executors capped by max concurrent tasks for a job group with multi cores executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +      .set("spark.executor.cores", "3")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stage0 = createStageInfo(0, 10)
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the stage
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    // Submit another stage in the same job
    +    val stage1 = createStageInfo(1, 10)
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
    +
    +    // Set another jobGroup
    +    sc.setJobGroup("group2", "", false)
    +
    +    val stage3 = createStageInfo(3, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
    +
    +    // Clear jobGroup
    +    sc.clearJobGroup()
    +
    +    val stage4 = createStageInfo(4, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 17)
    +  }
    --- End diff --
    
    Yes. Will address it with the new changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134599045
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -598,13 +600,58 @@ private[spark] class ExecutorAllocationManager(
         private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
         // Number of tasks currently running on the cluster.  Should be 0 when no stages are active.
         private var numRunningTasks: Int = _
    +    private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int]
    +    private val jobIdToJobGroup = new mutable.HashMap[Int, String]
    +    private val stageIdToJobId = new mutable.HashMap[Int, Int]
    +    private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int]
     
         // stageId to tuple (the number of task with locality preferences, a map where each pair is a
         // node and the number of tasks that would like to be scheduled on that node) map,
         // maintain the executor placement hints for each stage Id used by resource framework to better
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
    +
    +      var jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        null
    +      }
    +
    +      val maxConTasks = if (jobGroupId != null &&
    +        conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
    +      } else {
    +        Int.MaxValue
    +      }
    +
    +      if (maxConTasks <= 0) {
    +        throw new IllegalArgumentException(
    +          "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.")
    +      }
    +
    +      if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        jobGroupId = "default-group-" + jobStart.jobId.hashCode
    --- End diff --
    
    Actually I think we could just use 1 job group id for all the the jobs that don't have a group specified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133499828
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      val jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        ""
    +      }
    +      val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
    +        Int.MaxValue)
    +
    +      logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks")
    +      allocationManager.synchronized {
    +        allocationManager.maxConcurrentTasks = maxConcurrentTasks
    --- End diff --
    
    yeah mark is right.  after all, that is what separates a job group property from a global property for the entire spark context.
    
    I see why this is desirable for the most common case, of just running one job at a time, but to get this to work with multiple concurrent jobs (& job groups) you need to track a map from jobGroup -> maxConcurrency, and then sum that up (handling overflow for Int.MaxValue)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133311545
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      val jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        ""
    +      }
    +      val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
    +        Int.MaxValue)
    +
    +      logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks")
    +      allocationManager.synchronized {
    +        allocationManager.maxConcurrentTasks = maxConcurrentTasks
    --- End diff --
    
    if you have multiple jobs running concurrently, you are not keeping a separate maxConcurrentTasks for each one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80937 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80937/testReport)** for PR 18950 at commit [`517acb4`](https://github.com/apache/spark/commit/517acb490ae5938a22c4175347f6bbc24b47781f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80937 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80937/testReport)** for PR 18950 at commit [`517acb4`](https://github.com/apache/spark/commit/517acb490ae5938a22c4175347f6bbc24b47781f).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    @squito I will pull the test from the latest master and update it with the changes we made.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80990/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    @kayousterhout @squito Can you review this PR ? Thanks. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81427/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80690/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve closed the pull request at:

    https://github.com/apache/spark/pull/18950


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134608269
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -598,13 +600,58 @@ private[spark] class ExecutorAllocationManager(
         private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
         // Number of tasks currently running on the cluster.  Should be 0 when no stages are active.
         private var numRunningTasks: Int = _
    +    private val jobGroupToMaxConTasks = new mutable.HashMap[String, Int]
    +    private val jobIdToJobGroup = new mutable.HashMap[Int, String]
    +    private val stageIdToJobId = new mutable.HashMap[Int, Int]
    +    private val stageIdToCompleteTaskCount = new mutable.HashMap[Int, Int]
     
         // stageId to tuple (the number of task with locality preferences, a map where each pair is a
         // node and the number of tasks that would like to be scheduled on that node) map,
         // maintain the executor placement hints for each stage Id used by resource framework to better
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
    +
    +      var jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        null
    +      }
    +
    +      val maxConTasks = if (jobGroupId != null &&
    +        conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
    +      } else {
    +        Int.MaxValue
    +      }
    +
    +      if (maxConTasks <= 0) {
    +        throw new IllegalArgumentException(
    +          "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.")
    +      }
    +
    +      if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
    +        jobGroupId = "default-group-" + jobStart.jobId.hashCode
    --- End diff --
    
    Okay I will have the default job group named as `__default_job_group`. Let me know if we want to change it to a different one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80690/testReport)** for PR 18950 at commit [`d3f8162`](https://github.com/apache/spark/commit/d3f8162dab4ca7065d7f296fd03528ce6ddfb923).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133547673
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      val jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        ""
    +      }
    +      val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
    +        Int.MaxValue)
    +
    +      logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks")
    +      allocationManager.synchronized {
    +        allocationManager.maxConcurrentTasks = maxConcurrentTasks
    --- End diff --
    
    @markhamstra Thanks for the feedback. I missed the properties being serialized so we can have multiple job groups running simultaneously. I am working on changes to address your comments and will update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133321125
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
         // place the executors.
         private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
     
    +    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    +      val jobGroupId = if (jobStart.properties != null) {
    +        jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
    +      } else {
    +        ""
    +      }
    +      val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
    +        Int.MaxValue)
    +
    +      logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks")
    +      allocationManager.synchronized {
    +        allocationManager.maxConcurrentTasks = maxConcurrentTasks
    --- End diff --
    
    I am sorry if the config name caused the confusion. The limit is per jobGroup and not per job.
    so we can really name it as `spark.jobGroup.[userJobGroup].maxConcurrentTasks`.
    
    Also spark allows us to set only a single job group at any given point in time with a single spark context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #81464 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81464/testReport)** for PR 18950 at commit [`8b38300`](https://github.com/apache/spark/commit/8b3830004d69bd5f109fd9846f59583c23a910c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #81427 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81427/testReport)** for PR 18950 at commit [`8b38300`](https://github.com/apache/spark/commit/8b3830004d69bd5f109fd9846f59583c23a910c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18950
  
    **[Test build #80938 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80938/testReport)** for PR 18950 at commit [`65941f7`](https://github.com/apache/spark/commit/65941f7884551e84a13a6cc2e7488a01e7d8beec).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by dhruve <gi...@git.apache.org>.
Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134608326
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager(
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      // We can limit the no. of concurrent tasks by a job group and multiple jobs can run with
    --- End diff --
    
    Okay. I will update the comment to make it more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r134581946
  
    --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -727,6 +780,68 @@ private[spark] class ExecutorAllocationManager(
         }
     
         /**
    +     * Calculate the maximum no. of concurrent tasks that can run currently.
    +     */
    +    def getMaxConTasks(): Int = {
    +      // We can limit the no. of concurrent tasks by a job group and multiple jobs can run with
    +      // multiple stages. We need to get all the active stages belonging to a job group to calculate
    +      // the total no. of pending + running tasks to decide the maximum no. of executors we need at
    +      // that time to serve the outstanding tasks. This is capped by the minimum of no. of
    +      // outstanding tasks and the max concurrent limit specified for the job group if any.
    +      val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1)))
    +
    +      def getMaxConTasks(
    +          maxConTasks: Int,
    +          stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = {
    +        if (stagesByJobGroupItr.hasNext) {
    +          val (jobGroupId, stages) = stagesByJobGroupItr.next
    +          // Get the total running and pending tasks for a job group.
    +          val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(0, stages.iterator)
    +          val maxTasks = Math.min(jobGroupToMaxConTasks(jobGroupId),
    +            totalIncompleteTasksForJobGroup)
    +          if (doesSumOverflow(maxConTasks, maxTasks)) {
    +            Int.MaxValue
    +          } else {
    +            getMaxConTasks(maxConTasks + maxTasks, stagesByJobGroupItr)
    +          }
    +        } else {
    +          maxConTasks
    +        }
    +      }
    +
    +      // Get the total running & pending tasks for all stages in a job group.
    +      def getIncompleteTasksForJobGroup(totalTasks: Int, stagesItr: Iterator[(Int, Int)]): Int = {
    +        if (stagesItr.hasNext) {
    +          val (stageId, numTasks) = stagesItr.next
    +          val activeTasks = getIncompleteTasksForStage(stageId, numTasks)
    +          if (doesSumOverflow(totalTasks, activeTasks)) {
    +            Int.MaxValue
    +          } else {
    +            getIncompleteTasksForJobGroup(totalTasks + activeTasks, stagesItr)
    +          }
    +        } else {
    +          totalTasks
    +        }
    +      }
    +
    +      // Get the total running & pending tasks for a single stage.
    +      def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
    +        var pendingTasks = numTasks
    +        if (stageIdToTaskIndices.contains(stageId)) {
    +          pendingTasks -= stageIdToTaskIndices(stageId).size
    +        }
    +        var runningTasks = 0
    +        if (stageIdToCompleteTaskCount.contains(stageId)) {
    +          runningTasks = stageIdToTaskIndices(stageId).size - stageIdToCompleteTaskCount(stageId)
    +        }
    +        pendingTasks + runningTasks
    +      }
    --- End diff --
    
    nit: add newline after just to make it more readable to see the call to getMaxConTasks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org