You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by li-zhihui <gi...@git.apache.org> on 2014/05/28 08:18:45 UTC

[GitHub] spark pull request: submit stage after (configured ratio of) execu...

GitHub user li-zhihui opened a pull request:

    https://github.com/apache/spark/pull/900

    submit stage after (configured ratio of) executors have been registered

    Because creating TaskSetManager and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality.
    
    A simple solution is sleeping few seconds in application, so that executors have enough time to register.
    
    The PR add 2 configuration properties to make DAGScheduler submit stage after a few of executors have been registered. 
    
    # submit stage only after successfully registered executors arrived the ratio, default value 0
    spark.executor.registeredRatio = 0.8
    
    # whatever registeredRatio is arrived, submit stage after the maxRegisteredWaitingTime(millisecond), default value 10000
    spark.executor.maxRegisteredWaitingTime = 5000

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/li-zhihui/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #900
    
----
commit a9ee1cb3efbcb58b59f66b4ddecb515609b51b86
Author: li-zhihui <zh...@intel.com>
Date:   2014-05-28T05:35:13Z

    submit stage after (configured ratio of) executors have been registered

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225520
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala ---
    @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
     
       override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
         memory: Int) {
    +    totalExecutors.addAndGet(1)
    --- End diff --
    
    Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-48604243
  
    postStartHook seems like a good place to put it.  It will only be called once there and as you said Yarn cluster mode actually waits for the sparkcontext to be initialized before allocating executors. 
    
    It looks like we aren't handling mesos.  We should atleast file a jira for this.
    @li-zhihui did you look at mesos at all?
    
    For the yarn side where you added the TODO's about the sleep. I think we can leave them here as there is another jira to remove them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225172
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    @kayousterhout If we removed creating ApplicationMasterArguments, we must assign default value (=2) of numExecutors in this class, that will lead to duplicate setting. Unless, we extract the value as a static constant. 
    BTW, I think constants reference is a little confused in Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13895173
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready){
    +      return true
    +    }
    +    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
    +      ready = true
    +      return true
    +    }
    +    return false
    --- End diff --
    
    override def isReady(): Boolean = {
      if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
        ready = true
      }
      ready
    }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14238510
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    +    } else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
    +      IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
    --- End diff --
    
    you aren't setting numExectuors.  Also doesn't IntParam already give you an int (Option[Int]) so no need to do toInt again.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280510
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    Cool !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13740722
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       var totalCoreCount = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
    +  val minRegisteredNum = conf.getDouble("spark.executor.minRegisteredNum", 0)
    --- End diff --
    
    +1 for using a percentage.   spark.scheduler seems like the right prefix (and we should use it for maxRegisteredWaitingTime as well)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47104091
  
    thanks @li-zhihui. I was actually referring to modifying the user docs to add the new configs.  look in docs/configuration.md.   
    
    It makes sense to move it down and get as much initialization stuff out of the way before waiting.  To me exactly which class it goes in depends on how we see it fitting and potentially being used in the future.  You could for instance move it down into submitMissingTasks before the call to submitTasks and leave it in DAGScheduler instead.
    
    I think for this pr where we are just checking initially (job submission) that we have enough executors it doesn't matter to much.  But in the future if we would want to check between stages or potentially when adding tasks then it matters where it goes.  
    
    perhaps @kayousterhout has opinion on where it better fits?   



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47188350
  
    @tgravescs @kayousterhout 
    How about move waitBackendReady to TaskSchedulerImpl.start. It will be called only once at spark initialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14203837
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    Yeah so my point was can you just replace this code and the addArg() function with something simpler that avoids creating this list of options and passing them to ApplicationMasterArguments?  So, just check to see if SPARK_EXECUTOR_INSTANCES/SPARK_WORKER_INSTANCES/spark.executor.instances/spark.worker.instances are set and if so, set totalExecutors accordingly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13669385
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -431,6 +431,16 @@ private[spark] class TaskSchedulerImpl(
     
       // By default, rack is unknown
       def getRackForHost(value: String): Option[String] = None
    +  override def waitBackendReady():Unit={
    +    if(backend.isReady){
    +      return
    +    }
    +    while(!backend.isReady){
    +      synchronized{
    +        this.wait(100)
    --- End diff --
    
    Is there a reason for not using wait/notify here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14202320
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  // Submit tasks only after (registered executors / total executors) arrived the ratio.
    --- End diff --
    
    "arrived the ratio" --> "is equal to at least this value"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47330805
  
    @tgravescs @kayousterhout 
    I add a new commit according to comments.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280473
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    (so you don't override it if spark.executor.instances is not already set)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45787108
  
    Also, @li-zhihui, can you add a unit test for this?
    
    Second, it looks like this only works in standalone mode and not for YARN (since, as I understand the YARN code, YARN uses YarnClientSchedulerBackend and not SparkDeploySchedulerBackend)?  Was that the intention?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14231480
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready) {
    --- End diff --
    
    Thanks @pwendell @kayousterhout I am more thoughtful about these code's performance. ^_^
    But we can't simply inline the code because executorActor is a member of inner class DriverActor. Although we can get the member by adding some code, I don't sure it cost to do.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47204723
  
    @tgravescs @kayousterhout 
    I add a new commit
    
    * Move waitBackendReady to TaskSchedulerImpl.start
    * Code refactor by @kayousterhout 's comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14032829
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    --- End diff --
    
    Why is this private[spark] as opposed to just private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45844273
  
    Thanks @sryza  How about spark.scheduler.minRegisteredExecutors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47148880
  
    @tgravescs @li-zhihui I would be in favor of moving the wait earlier -- because currently every job will call waitBackendReady(), but really this should just be called once when the app is initialized (since once it is ready, it will never become un-ready with the current code).  If at some point this gets used to check the state of things between stages, as @tgravescs suggested, we can then move it to be checked before each stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13709414
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       var totalCoreCount = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
    +  val minRegisteredNum = conf.getDouble("spark.executor.minRegisteredNum", 0)
    --- End diff --
    
    ultimately I would prefer the end user config for yarn to be a percentage. That makes it easier to set a decent default across the entire cluster (other then just off).    I guess if we had to we could have another config for yarn which ends up being applied to the # of executors requested and sets this conf for the user, but that is just one more config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14060589
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    @kayousterhout Here ApplicationMaterArguments is used to get default value of numExecutors (It's 2, now).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14205738
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala ---
    @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
     
       override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
         memory: Int) {
    +    totalExecutors.addAndGet(1)
    --- End diff --
    
    Is there a race condition here, where isReady() can return true because totalExecutors has not been correctly set yet?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45783481
  
    Thanks @mridulm for the clarification.  So it seems like this change would be useful and is similar to what we discussed in SPARK-1453.  Are there any general concerns over adding configs to wait?  If  
    
    I'm seeing more people running into this and would like to get something implemented so each user doesn't have to put sleep in their code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13851868
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---
    @@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend(
     
         logDebug("ClientArguments called with: " + argsArrayBuf)
         val args = new ClientArguments(argsArrayBuf.toArray, conf)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    Thanks @tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14245269
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExpectedExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  // Submit tasks only after (registered executors / total expected executors) 
    +  // is equal to at least this value.
    +  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
    --- End diff --
    
     We should add a check for > 1 and set to 1 if over. I initially set it to 40 in a test thinking that meant 40%.  I guess the documentation will also clarify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280284
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    +    } else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
    +      IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
    --- End diff --
    
    Ah cool that makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13977260
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
    +    System.setProperty("spark.executor.instances",args.numExecutors.toString)
    --- End diff --
    
    I'm assuming you need to change the yarn/alpha version of this also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280315
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    --- End diff --
    
    I agree that those other constants could be defined in a better way -- but my understanding is that this case is more extreme, because if you defined numExecutors to be something other than 2 here, there would actually be a bug right, because the scheduler would be waiting for the wrong number of executors to start?
    
    In any case, this PR doesn't need to fix all of the constants in the project, but any new constants added should be added properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47490304
  
    @tgravescs @kayousterhout 
    It will lead to a logic deadlock in yarn-cluster mode, if waitBackendReady is in TaskSchedulerImpl.start.
    
    How about move it (waitBackendReady) to postStartHook() ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14205805
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExecutors = new AtomicInteger(0)
    --- End diff --
    
    Can you add a comment here saying this is the total number of executors we expect to be launched for this cluster?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46582362
  
    thanks @li-zhihui. I made a few small comments.  I haven't had time yet to actually try this out, I hope to later today or tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280133
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    Ok but you can just reverse the order right?  Like, first set numExecutors based on SPARK_EXECUTOR_INSTANCES, and then after that have my proposed line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45269243
  
    This is also related to SPARK-1937 (#892) -- is fixing that sufficient or is it necessary to have this sleep as well?  It seems like this sleep is only necessary if the sleep time is less than the expected increase in task completion time as a result of running non-locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225319
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready) {
    --- End diff --
    
    @kayousterhout Now, the method is called per submitting tasks, it can return quickly by saving the value of  "ready".  
    If we moved waitBackendReady to BackendScheduler.start, the method will be called only once, and we should follow the idea.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13749934
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       var totalCoreCount = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
    +  val minRegisteredNum = conf.getDouble("spark.executor.minRegisteredNum", 0)
    --- End diff --
    
    I agree that the default should be zero to keep same behavior, but it should also be able to be changed to a reasonable default for all people using a cluster.  For instance, I want all spark on yarn customers using grid x to have this default to 90% of their executors available before starting because that is what gives them a good experience.  Like I said if that doesn't make sense for other deployment modes or its very hard to implement then we can work around for yarn, but its adding yet another config which I would prefer not to do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45779752
  
    This one slipped off my radar, my apologies.
    @tgravescs In #892, if there is even a single executor which is process local with any partition, then we start waiting for all levels based on configured timeouts.
    Here we are trying to ensure there are sufficient executors available before we start accepting jobs. The intent is slightly differrent



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: submit stage after (configured ratio of) execu...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-44369774
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47331430
  
    @tgravescs @kayousterhout 
    I move waitBackendReady back to submitTasks method, because it (waitBackendReady in start method) dose not work on yarn-cluster mode (NullPointException because SparkContext initialize timeout) (yarn-client is ok).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14252092
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    can you eliminate the "if" here and just have
    numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
    
    (and same for the case below)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45780405
  
    Hit submit by mistake, to continue ...
    The side effect of not having sufficient executors are different from #892. For example, 
    a) the default parallelism in yarn is based on number of executors, 
    b) the number of intermediate files per node for shuffle (this can bring the node down btw)
    c) and amount of memory consumed on a node for rdd MEMORY persisted data (making the job fail if disk is not specified : like some of the mllib algos ?)
    and so on ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47061333
  
    @tgravescs 
    I add a new commit that move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks, for 2 reasons
    
    * Optimize performance, some works (creating TaskSet, building task preferred locality...)  is irrelevant with executor, that can be done before waiting backend ready.
    * Clear responsibility, it seems waiting backend ready is responsibility of TaskScheduler, not DAGScheduler.
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-48714143
  
    Thanks @tgravescs 
    I will file a new jira for handling mesos and follow it after the PR merged.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225397
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
    +    System.setProperty("spark.executor.instances", args.numExecutors.toString)
    --- End diff --
    
    It's for yarn-cluster mode.
    In yarn-cluster mode, Driver run in yarn container and lost System config which set in client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14036842
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    Ah I see -- I was confused by this http://spark.apache.org/docs/latest/spark-standalone.html -- since standalone mode interprets SPARK_WORKER_INSTANCES differently.  Sorry for the confusion!
    
    My other question here was about the ApplicationMaterArguments here -- does that actually get used, or is it just constructed as a way to get numExecutors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14032722
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    --- End diff --
    
    fix import ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: submit stage after (configured ratio of) execu...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-44370086
  
    Jenkins, test this please. @li-zhihui could you create a JIRA for this and add it to the title of the PR (e.g. SPARK-XXX)?
    
    /cc @mridul @tgravescs @kayousterhout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14132807
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
    +  val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
    --- End diff --
    
    can you please add docs for the new configs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45840353
  
    Sorry to jump in late on this, but I think spark.executor.minRegisteredNum sounds like an executor property, when this is a property of the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14252164
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    +    } else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
    +      IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
    --- End diff --
    
    Also what about SPARK_WORKER_INSTANCES? My understanding is that checking that environment variable is necessary for backwards compatibility (correct me if I'm wrong here @tgravescs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46657630
  
    @tgravescs @mridulm 
    In my test case(4 nodes, 128 executors), it need 25 seconds to register all executors.
    Now maxRegisteredWaitingTime = 10 senconds, I think it's not enough.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280463
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13977201
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
    +    System.setProperty("spark.executor.instances",args.numExecutors.toString)
    --- End diff --
    
    please add space after ,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13976699
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +252,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready){
    --- End diff --
    
    add a space after ) before {


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/900


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14252031
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    --- End diff --
    
    Can you read this from a constant (you'll need to extract the constant) in ApplicationMasterArguments, rather than defining the default in both places?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14033313
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    I'm a little confused here -- is the point of this code just to set CoarseGrainedSchedulerBackend.totalExecutors?  Why do you check both SPARK_WORKER_INSTANCES and SPARK_EXECUTOR_INSTANCES to set the number of executors?  Don't these mean different things?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225485
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready) {
    --- End diff --
    
    Based on my experience profiling the Spark scheduler, things like this do not affect performance in any significant way and in practice are often optimized out by JIT anyway, so we should opt for the more readable version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14202548
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready) {
    --- End diff --
    
    I think saving the value of "ready" makes the code a bit difficult to read here, in part because it doesn't actually signal whether the backend is ready (since isReady() could return true even when ready is false).  Can you just eliminate "ready" and move this line:
    
    if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) {
    
    to here?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14205832
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExecutors = new AtomicInteger(0)
    --- End diff --
    
    Or maybe it would be better to rename this totalExpectedExecutors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14205252
  
    --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
     
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
    +    System.setProperty("spark.executor.instances", args.numExecutors.toString)
    --- End diff --
    
    Why do you need to set this here?  Is this for the case when args.numExecutors was set by SPARK_EXECUTOR_INSTANCES (since otherwise it seems like spark.executor.instances will already be set, right)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by CrazyJvm <gi...@git.apache.org>.
Github user CrazyJvm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13692635
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready){
    +      return true
    +    }
    +    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
    +      ready = true
    +      return true
    +    }
    +    return false
    --- End diff --
    
    no need "return" i think


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14825405
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +257,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready) {
    +      return true
    +    }
    +    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
    --- End diff --
    
    it might be nice to have a log statement here saying max time hit so we know when the scheduling began if debugging a job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-48955561
  
    Looks good.  Thanks @li-zhihui 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946]Submit stage after (configured rat...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-44374073
  
    I create a JIRA and attach a file to describe it.
    https://issues.apache.org/jira/secure/attachment/12647078/Spark%20Task%20Scheduler%20Optimization%20Proposal.pptx
    
    @mridulm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45836171
  
    @tgravescs  @mridulm @kayousterhout I add a commit which submit stage after configured number executor are registered.
    
    # submit stage only after successfully registered executors arrived the number, default value 0
    spark.executor.minRegisteredNum = 20


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13688559
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -431,6 +431,16 @@ private[spark] class TaskSchedulerImpl(
     
       // By default, rack is unknown
       def getRackForHost(value: String): Option[String] = None
    +  override def waitBackendReady():Unit={
    +    if(backend.isReady){
    +      return
    +    }
    +    while(!backend.isReady){
    +      synchronized{
    +        this.wait(100)
    --- End diff --
    
    Just for programming simply. :)
    If someone would like to implement more backend implementations or change backend.isReady somewhere, they needn't to call NOTIFY().
    
    But, waiting 100 milliseconds maybe too long, is 10 milliseconds OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14232018
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    @kayousterhout Done. 
    About constants, maybe we can take another PR to manage constants for the whole project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45776939
  
    @kayousterhout @mridulm Looking briefly at pr #892 it seems that is handling locality when executors are added later and I assume some of the locality wait configs come into affect here, but it doesn't just wait for a certain number of executors to be there to start (or period of time), does it?  Are the changes good enough where that waiting for a good percentage of the executors no longer matters?   
    
    I seems like depending on your resource manager and how busy of a cluster you run on it could take a while (minutes) to get a large number of executors. I think this should just be a configuration and user code should not have to "sleep" or workaround this.  



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45833678
  
    @tgravescs @kayousterhout The PR only work in standalone mode now. But it provide a abstract method isReady() in SchedulerBackend.scala for all backend implementations.
    
    For yarn mode, it seems better to use registered number as threshold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14813843
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExpectedExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  // Submit tasks only after (registered executors / total expected executors) 
    +  // is equal to at least this value, that is double between 0 and 1.
    +  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
    --- End diff --
    
    @tgravescs Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14279974
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    @kayousterhout There is rule: system properties override environment variables. To eliminate the "if" will lead to environment variable override system property.
    https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L62
    
    BTW @tgravescs It seems these codes  against the rule
    https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L36


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14010613
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready){
    +      return true
    +    }
    +    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
    +      ready = true
    +      return true
    +    }
    +    return false
    --- End diff --
    
    Thanks @CrazyJvm I made a mistake, the last "return" is not necessary.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45795044
  
    If people think its not useful for other deployments modes then I can look into making it YARN specific. We already have a small sleep to help with this in YarnClusterScheduler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280169
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    --- End diff --
    
    @kayousterhout The constant (default value of numExecutors) is not only defined in this class and  ApplicationMasterArguments. It is also defined in ClientArguments. 
    
    Even I guess the below default values are from same consideration like numExecurots.
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232
    
    If we want to make the value to be constant, we should considerate all of them.
    So maybe we can add an object org.apache.spark.Constants to manange all constants.
    @tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-48869936
  
    @tgravescs add a commit according to comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45786350
  
    @li-zhihui it looks like the JIRA you created (https://issues.apache.org/jira/browse/SPARK-1946) describes the issue fixed by #892 and described by a redundant issue (https://issues.apache.org/jira/browse/SPARK-1937). As @mridulm explained (thanks!!), the primary set of issues addressed by this pull request center around the fact that Spark-on-YARN has various performance problems when not enough executors have registered yet.  Could you update SPARK-1946 accordingly?
    
    @tgravescs I'm a little nervous about adding more scheduler config options, because I think the average user would have a very difficult time figuring out that their performance problems could be fixed by tuning this particular set of options.  The scheduler already has quite a few config options and I think we should be very cautious in adding more (cc @pwendell).  On the other hand, as you pointed out, it seems like a user typically wants to wait for some number of executors to become available, and those semantics aren't available to the application -- so we're stuck with adding something to the scheduler code.  Is it possible to do this only for the YARN scheduler / do you think it's necessary in standalone too?  Doing it only for YARN (and naming the config variable accordingly) could help signal to a naive user when tuning this might help.  From @mridulm's description, it sounds like many of the issues here are yarn-specific.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225947
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready) {
    --- End diff --
    
    It would be simpler to just inline the following code. There is no valid performance argument for separating it.
    
    ```
    (executorActor.size >= totalExecutors.get() * minRegisteredRatio)
    ```
    
    Referencing the size of a HashMap in scala is a constant time operation. See
    
    https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/mutable/HashTable.scala#L48
    
    `totalExecutors.get()` is also a constant time operation.
    
    I don't see any performance argument for the current approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13793268
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       var totalCoreCount = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
    +  val minRegisteredNum = conf.getDouble("spark.executor.minRegisteredNum", 0)
    --- End diff --
    
    discard the commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14265985
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    +    } else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
    +      IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
    --- End diff --
    
    well the SPARK_WORKER_INSTANCES env variables was only using in yarn-client mode so isn't strictly needed here since this class is backend for yarn-cluster mode..  I assume that is why it was removed.   Something I hadn't thought of when you first asked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13692995
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready){
    +      return true
    +    }
    +    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
    +      ready = true
    +      return true
    +    }
    +    return false
    --- End diff --
    
    Thanks @CrazyJvm I am a Scala beginner. But if I remove "return", the method always return true. I don't know why. I use Scala 2.10.3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13977443
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---
    @@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend(
     
         logDebug("ClientArguments called with: " + argsArrayBuf)
         val args = new ClientArguments(argsArrayBuf.toArray, conf)
    +    totalExecutors.set(args.numExecutors)
    +    // reset default minRegisteredRatio for yarn mode
    +    if (minRegisteredRatio == 0) {
    +      minRegisteredRatio = 0.9
    --- End diff --
    
    I'm a bit on the fence about making the default 90%.  While I want it high on yarn I was originally thinking this would just be set by a cluster config (user level) and leave the default in Spark code as 0 for backwards compatibility. So its probably better to remove this and leave default 0 unless others disagree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13740596
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       var totalCoreCount = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
    +  val minRegisteredNum = conf.getDouble("spark.executor.minRegisteredNum", 0)
    --- End diff --
    
    @tgravescs 
    I think default value the config property should be 0 for keeping consistent with previous version, whatever we use number or percentage. 
    In my opinion, using executor number as config property should keep consistent with the args(and conf, env) --num-executors, user  can accept the conf property more easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45268163
  
    This is very similar to SPARK-1453 and this may solve it generically instead of just yarn.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-45794544
  
    yarn uses CoarseGrainedSchedulerBackend for standalone mode and the YarnClientSchedulerBackend (which is based on CoarseGrainedSchedulerBackend) for client mode.  I don't see this pr handling that since its not incrementing the totalExecutors. I haven't looked at it in detail yet but I would think you could do it in RegisterExecutor and that would handle both since SparkDeploySchedulerBackend is based on CoarseGrainedSchedulerBackend also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by mridul <gi...@git.apache.org>.
Github user mridul commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46661369
  
    Hi,
    Sorry to intrude in your thread, but perhaps you have the wrong @mridul
    referenced in your comments :)
    
    Mridul
    
    
    On 20 June 2014 14:01, Zhihui Li <no...@github.com> wrote:
    
    > Thanks @tgravescs <https://github.com/tgravescs>
    > I add a new commit.
    >
    >    - code style
    >    - default minRegisteredRatio = 0 in yarn mode
    >    - driver get --num-executors in yarn/alpha
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/900#issuecomment-46655979>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14035880
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    no, the config used to be called SPARK_WORKER_INSTANCES and now its SPARK_EXECUTOR_INSTANCES.  Workers really meant executors on yarn. So this is just for backwards compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46655979
  
    Thanks @tgravescs 
    I add a new commit.
    * code style
    * default minRegisteredRatio = 0 in yarn mode
    * driver get --num-executors in yarn/alpha



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14823260
  
    --- Diff: docs/configuration.md ---
    @@ -699,6 +699,22 @@ Apart from these, the following properties are also available, and may be useful
         (in milliseconds)
       </td>
     </tr>
    +</tr>
    +  <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
    +  <td>0</td>
    +  <td>
    +    Submit tasks only after (registered executors / total expected executors)
    +    is equal to at least this value, which is double between 0 and 1.
    +  </td>
    +</tr>
    +<tr>
    +  <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
    +  <td>30000</td>
    +  <td>
    +    Whatever (registered executors / total expected executors) is reached 
    --- End diff --
    
    I think we should clarify both of these a bit because its really you start when either one is hit so I think adding reference to maxRegisteredExecutorsWaitingTime from the description of minRegisteredExecutorsRatio would be good.   
    
    How about something like below?  Note I'm not a doc writer so I'm fine with changing.
    
    for spark.scheduler.minRegisteredExecutorsRatio:
    The minimum ratio of registered executors (registered executors / total expected executors) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of executors has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code> . 
    
    Then for spark.scheduler.maxRegisteredExecutorsWaitingTime:
    Maximum amount of time to wait for executors to register before scheduling begins (in milliseconds). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14280444
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +import org.apache.spark.util.IntParam
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
    +
    +  override def start() {
    +    super.start()
    +    var numExecutors = 2
    +    if (sc.getConf.contains("spark.executor.instances")) {
    +      numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
    --- End diff --
    
    If we set numExecutors based on SPARK_EXECUTORS_INSTANCE firstly, because sc.getConf.getInt("spark.executor.instances", 2) will always return a value, it will lead to SPARK_EXECUTORS_INSTANCE be override whatever spark.executor.instances is configured.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: submit stage after (configured ratio of) execu...

Posted by sryza <gi...@git.apache.org>.
Github user sryza commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-44370236
  
    This should probably be unified with https://github.com/apache/spark/pull/634


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-47189335
  
    That seems like a good plan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13808991
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---
    @@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend(
     
         logDebug("ClientArguments called with: " + argsArrayBuf)
         val args = new ClientArguments(argsArrayBuf.toArray, conf)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    note that yarn has 2 modes - yarn-client and yarn-cluster mode. This changes it for yarn-client mode, but not yarn-cluster mode.  yarn-cluster mode currently just uses CoarseGrainedSchedulerBackend directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14032678
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredRatio", 0)
    --- End diff --
    
    Can you change this to minRegisteredExecutorsRatio?  It's more verbose but I think better to be more descriptive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14225862
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.deploy.yarn.ApplicationMasterArguments
    +import org.apache.spark.scheduler.TaskSchedulerImpl
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +private[spark] class YarnClusterSchedulerBackend(
    +    scheduler: TaskSchedulerImpl,
    +    sc: SparkContext)
    +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    +  with Logging {
    +
    +  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
    +      arrayBuf: ArrayBuffer[String]) {
    +    if (System.getenv(envVar) != null) {
    +      arrayBuf += (optionName, System.getenv(envVar))
    +    } else if (sc.getConf.contains(sysProp)) {
    +      arrayBuf += (optionName, sc.getConf.get(sysProp))
    +    }
    +  }
    +
    +  override def start() {
    +    super.start()
    +    val argsArrayBuf = new ArrayBuffer[String]()
    +    List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
    +      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"))
    +      .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
    +    val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
    +    totalExecutors.set(args.numExecutors)
    --- End diff --
    
    Yeah can you just do that (extract the value as a static constant)?  Then this file can be reduced to just checking for the value of the two environment variables and for the spark.executor.instances conf variable and setting totalExecutors accordingly (spark.worker.instances shouldn't be used I don't think -- see #1214)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46806867
  
    @tgravescs @kayousterhout I add a new commit
    
    * Code style
    * Rename configuration property name and set default value of maxRegisteredExecutorsWaitingTime  to 30000
    spark.scheduler.minRegisteredExecutorsRatio = 0
    spark.scheduler.maxRegisteredExecutorsWaitingTime = 30000



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r13895415
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
             throw new SparkException("Error notifying standalone scheduler's driver actor", e)
         }
       }
    +
    +  override def isReady(): Boolean = {
    +    if (ready){
    +      return true
    +    }
    +    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
    +      ready = true
    +      return true
    +    }
    +    return false
    --- End diff --
    
    thanks @adrian-wang , but I think it's necessary to return true quickly, because ready is true most time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46291702
  
    @tgravescs I add a commit support yarn-cluster.
    
    A little issue, the YarnClusterSchedulerBackend can't get --num-executors as totalExecutors currently(spark-default.xml is ok).
    I will follow it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14764078
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExpectedExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  // Submit tasks only after (registered executors / total expected executors) 
    +  // is equal to at least this value, that is double between 0 and 1.
    +  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
    --- End diff --
    
    Please add the new configs to the user docs - see docs/configuration.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/900#discussion_r14166951
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---
    @@ -46,9 +46,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     {
       // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
       var totalCoreCount = new AtomicInteger(0)
    +  var totalExecutors = new AtomicInteger(0)
       val conf = scheduler.sc.conf
       private val timeout = AkkaUtils.askTimeout(conf)
       private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    +  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
    +  val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
    --- End diff --
    
    @tgravescs added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

Posted by li-zhihui <gi...@git.apache.org>.
Github user li-zhihui commented on the pull request:

    https://github.com/apache/spark/pull/900#issuecomment-46146641
  
    @tgravescs @sryza I add a commit support the feature(with percentage style) for yarn mode, test successfully in yarn 2.2.0.
    
    About default value of minRegisteredRatio, Yarn mode is 0.9, Standalong is 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---