You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/07/14 22:33:01 UTC
git commit: [SPARK-1946] Submit tasks after (configured ratio)
executors have been registered
Repository: spark
Updated Branches:
refs/heads/master d60b09bb6 -> 3dd8af7a6
[SPARK-1946] Submit tasks after (configured ratio) executors have been registered
Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality.
A simple solution is sleeping few seconds in application, so that executors have enough time to register.
The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered.
\# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0
spark.scheduler.minRegisteredExecutorsRatio = 0.8
\# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000
spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000
Author: li-zhihui <zh...@intel.com>
Closes #900 from li-zhihui/master and squashes the following commits:
b9f8326 [li-zhihui] Add logs & edit docs
1ac08b1 [li-zhihui] Add new configs to user docs
22ead12 [li-zhihui] Move waitBackendReady to postStartHook
c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS
4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor
0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks
4261454 [li-zhihui] Add docs for new configs & code style
ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime
6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha
812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode
e7b6272 [li-zhihui] support yarn-cluster
37f7dc2 [li-zhihui] support yarn mode(percentage style)
3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dd8af7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dd8af7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dd8af7a
Branch: refs/heads/master
Commit: 3dd8af7a6623201c28231f4b71f59ea4e9ae29bf
Parents: d60b09b
Author: li-zhihui <zh...@intel.com>
Authored: Mon Jul 14 15:32:49 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Mon Jul 14 15:32:49 2014 -0500
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 11 +++++-
.../spark/scheduler/SchedulerBackend.scala | 1 +
.../spark/scheduler/TaskSchedulerImpl.scala | 15 ++++++++
.../cluster/CoarseGrainedSchedulerBackend.scala | 29 ++++++++++++++
.../cluster/SparkDeploySchedulerBackend.scala | 1 +
docs/configuration.md | 19 ++++++++++
.../spark/deploy/yarn/ApplicationMaster.scala | 1 +
.../yarn/ApplicationMasterArguments.scala | 6 ++-
.../cluster/YarnClientClusterScheduler.scala | 2 +
.../cluster/YarnClientSchedulerBackend.scala | 1 +
.../cluster/YarnClusterScheduler.scala | 2 +
.../cluster/YarnClusterSchedulerBackend.scala | 40 ++++++++++++++++++++
.../spark/deploy/yarn/ApplicationMaster.scala | 1 +
13 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8819e73..8052499 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e)
}
}
- val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ val backend = try {
+ val clazz =
+ Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
+ val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
+ cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
+ } catch {
+ case e: Exception => {
+ throw new SparkException("YARN mode not available ?", e)
+ }
+ }
scheduler.initialize(backend)
scheduler
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 6a6d8e6..e41e0a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
+ def isReady(): Boolean = true
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5ed2803..4b6d6da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
}
}
+ override def postStartHook() {
+ waitBackendReady()
+ }
+
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
+
+ private def waitBackendReady(): Unit = {
+ if (backend.isReady) {
+ return
+ }
+ while (!backend.isReady) {
+ synchronized {
+ this.wait(100)
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 05d01b0..0f5545e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ var totalExpectedExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ // Submit tasks only after (registered executors / total expected executors)
+ // is equal to at least this value, that is double between 0 and 1.
+ var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+ if (minRegisteredRatio > 1) minRegisteredRatio = 1
+ // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+ val maxRegisteredWaitingTime =
+ conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+ val createTime = System.currentTimeMillis()
+ var ready = if (minRegisteredRatio <= 0) true else false
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
+ if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
+ ready = true
+ logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
+ executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
+ ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
+ }
makeOffers()
}
@@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
}
}
+
+ override def isReady(): Boolean = {
+ if (ready) {
+ return true
+ }
+ if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+ ready = true
+ logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
+ "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+ return true
+ }
+ false
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c07b3f..bf2dc88 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
+ totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0aea23a..07aa4c0 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds)
</td>
</tr>
+</tr>
+ <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
+ <td>0</td>
+ <td>
+ The minimum ratio of registered executors (registered executors / total expected executors)
+ to wait for before scheduling begins. Specified as a double between 0 and 1.
+ Regardless of whether the minimum ratio of executors has been reached,
+ the maximum amount of time it will wait before scheduling begins is controlled by config
+ <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
+ </td>
+</tr>
+<tr>
+ <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
+ <td>30000</td>
+ <td>
+ Maximum amount of time to wait for executors to register before scheduling begins
+ (in milliseconds).
+ </td>
+</tr>
</table>
#### Security
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 438737f..062f946 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
+ System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName(
args.userClass,
false /* initialize */ ,
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 25cc901..4c383ab 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
- var numExecutors = 2
+ var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList)
@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.exit(exitCode)
}
}
+
+object ApplicationMasterArguments {
+ val DEFAULT_NUMBER_EXECUTORS = 2
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 6b91e6b..15e8c21 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
+ super.postStartHook()
// The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ // TODO It needn't after waitBackendReady
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index fd2694f..0f9fdcf 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
+ totalExpectedExecutors.set(args.numExecutors)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 39cdd2e..9ee53d7 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+ super.postStartHook()
if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ // TODO It needn't after waitBackendReady
Thread.sleep(3000L)
}
logInfo("YarnClusterScheduler.postStartHook done")
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 0000000..a04b08f
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+ override def start() {
+ super.start()
+ var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+ numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+ }
+ // System property can override environment variable.
+ numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
+ totalExpectedExecutors.set(numExecutors)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ee1e9c9..1a24ec7 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
+ System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName(
args.userClass,
false,