You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/07/14 22:33:01 UTC

git commit: [SPARK-1946] Submit tasks after (configured ratio) executors have been registered

Repository: spark
Updated Branches:
  refs/heads/master d60b09bb6 -> 3dd8af7a6


[SPARK-1946] Submit tasks after (configured ratio) executors have been registered

Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality.

A simple solution is sleeping few seconds in application, so that executors have enough time to register.

The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered.

\# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0
spark.scheduler.minRegisteredExecutorsRatio = 0.8

\# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000
spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000

Author: li-zhihui <zh...@intel.com>

Closes #900 from li-zhihui/master and squashes the following commits:

b9f8326 [li-zhihui] Add logs & edit docs
1ac08b1 [li-zhihui] Add new configs to user docs
22ead12 [li-zhihui] Move waitBackendReady to postStartHook
c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS
4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor
0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks
4261454 [li-zhihui] Add docs for new configs & code style
ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime
6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha
812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode
e7b6272 [li-zhihui] support yarn-cluster
37f7dc2 [li-zhihui] support yarn mode(percentage style)
3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dd8af7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dd8af7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dd8af7a

Branch: refs/heads/master
Commit: 3dd8af7a6623201c28231f4b71f59ea4e9ae29bf
Parents: d60b09b
Author: li-zhihui <zh...@intel.com>
Authored: Mon Jul 14 15:32:49 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Mon Jul 14 15:32:49 2014 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 11 +++++-
 .../spark/scheduler/SchedulerBackend.scala      |  1 +
 .../spark/scheduler/TaskSchedulerImpl.scala     | 15 ++++++++
 .../cluster/CoarseGrainedSchedulerBackend.scala | 29 ++++++++++++++
 .../cluster/SparkDeploySchedulerBackend.scala   |  1 +
 docs/configuration.md                           | 19 ++++++++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  1 +
 .../yarn/ApplicationMasterArguments.scala       |  6 ++-
 .../cluster/YarnClientClusterScheduler.scala    |  2 +
 .../cluster/YarnClientSchedulerBackend.scala    |  1 +
 .../cluster/YarnClusterScheduler.scala          |  2 +
 .../cluster/YarnClusterSchedulerBackend.scala   | 40 ++++++++++++++++++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  1 +
 13 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8819e73..8052499 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
             throw new SparkException("YARN mode not available ?", e)
           }
         }
-        val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+        val backend = try {
+          val clazz =
+            Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
+          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
+          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
+        } catch {
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
+          }
+        }
         scheduler.initialize(backend)
         scheduler
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 6a6d8e6..e41e0a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
 
   def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
     throw new UnsupportedOperationException
+  def isReady(): Boolean = true
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5ed2803..4b6d6da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  override def postStartHook() {
+    waitBackendReady()
+  }
+
   override def submitTasks(taskSet: TaskSet) {
     val tasks = taskSet.tasks
     logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(
 
   // By default, rack is unknown
   def getRackForHost(value: String): Option[String] = None
+
+  private def waitBackendReady(): Unit = {
+    if (backend.isReady) {
+      return
+    }
+    while (!backend.isReady) {
+      synchronized {
+        this.wait(100)
+      }
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 05d01b0..0f5545e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExpectedExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total expected executors) 
+  // is equal to at least this value, that is double between 0 and 1.
+  var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+  if (minRegisteredRatio > 1) minRegisteredRatio = 1
+  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+  val maxRegisteredWaitingTime =
+    conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+  val createTime = System.currentTimeMillis()
+  var ready = if (minRegisteredRatio <= 0) true else false
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
     private val executorActor = new HashMap[String, ActorRef]
@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
           executorAddress(executorId) = sender.path.address
           addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
+          if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
+            ready = true
+            logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
+              executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
+              ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
+          }
           makeOffers()
         }
 
@@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         throw new SparkException("Error notifying standalone scheduler's driver actor", e)
     }
   }
+
+  override def isReady(): Boolean = {
+    if (ready) {
+      return true
+    }
+    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+      ready = true
+      logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
+        "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+      return true
+    }
+    false
+  }
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c07b3f..bf2dc88 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
     memory: Int) {
+    totalExpectedExecutors.addAndGet(1)
     logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
       fullId, hostPort, cores, Utils.megabytesToString(memory)))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0aea23a..07aa4c0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful
     (in milliseconds)
   </td>
 </tr>
+</tr>
+  <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
+  <td>0</td>
+  <td>
+    The minimum ratio of registered executors (registered executors / total expected executors)
+    to wait for before scheduling begins. Specified as a double between 0 and 1.
+    Regardless of whether the minimum ratio of executors has been reached,
+    the maximum amount of time it will wait before scheduling begins is controlled by config 
+    <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code> 
+  </td>
+</tr>
+<tr>
+  <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
+  <td>30000</td>
+  <td>
+    Maximum amount of time to wait for executors to register before scheduling begins
+    (in milliseconds).  
+  </td>
+</tr>
 </table>
 
 #### Security

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 438737f..062f946 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
   private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
+    System.setProperty("spark.executor.instances", args.numExecutors.toString)
     val mainMethod = Class.forName(
       args.userClass,
       false /* initialize */ ,

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 25cc901..4c383ab 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
   var executorCores = 1
-  var numExecutors = 2
+  var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
 
   parseArgs(args.toList)
   
@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
     System.exit(exitCode)
   }
 }
+
+object ApplicationMasterArguments {
+  val DEFAULT_NUMBER_EXECUTORS = 2
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 6b91e6b..15e8c21 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
 
   override def postStartHook() {
 
+    super.postStartHook()
     // The yarn application is running, but the executor might not yet ready
     // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+    // TODO It needn't after waitBackendReady
     Thread.sleep(2000L)
     logInfo("YarnClientClusterScheduler.postStartHook done")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index fd2694f..0f9fdcf 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend(
 
     logDebug("ClientArguments called with: " + argsArrayBuf)
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
+    totalExpectedExecutors.set(args.numExecutors)
     client = new Client(args, conf)
     appId = client.runApp()
     waitForApp()

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 39cdd2e..9ee53d7 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
 
   override def postStartHook() {
     val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+    super.postStartHook()
     if (sparkContextInitialized){
       ApplicationMaster.waitForInitialAllocations()
       // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+      // TODO It needn't after waitBackendReady
       Thread.sleep(3000L)
     }
     logInfo("YarnClusterScheduler.postStartHook done")

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 0000000..a04b08f
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+    super.start()
+    var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+    if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+      numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+    }
+    // System property can override environment variable.
+    numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
+    totalExpectedExecutors.set(numExecutors)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ee1e9c9..1a24ec7 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 
   private def startUserClass(): Thread = {
     logInfo("Starting the user JAR in a separate Thread")
+    System.setProperty("spark.executor.instances", args.numExecutors.toString)
     val mainMethod = Class.forName(
       args.userClass,
       false,