You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/23 12:53:34 UTC
spark git commit: [SPARK-21115][CORE] If the cores left is less than
the coresPerExecutor, the cores left will not be allocated,
so it should not to check in every schedule
Repository: spark
Updated Branches:
refs/heads/master 153dd49b7 -> acd208ee5
[SPARK-21115][CORE] If the cores left is less than the coresPerExecutor,the cores left will not be allocated, so it should not to check in every schedule
## What changes were proposed in this pull request?
If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the cores left is always 1, so it will try to allocate executors in the function org.apache.spark.deploy.master.startExecutorsOnWorkers in every schedule.
Another question is, is it will be better to allocate another executor with 1 core for the cores left.
## How was this patch tested?
unit test
Author: 10129659 <ch...@zte.com.cn>
Closes #18322 from eatoncys/leftcores.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acd208ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acd208ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acd208ee
Branch: refs/heads/master
Commit: acd208ee50b29bde4e097bf88761867b1d57a665
Parents: 153dd49
Author: 10129659 <ch...@zte.com.cn>
Authored: Fri Jun 23 20:53:26 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jun 23 20:53:26 2017 +0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkConf.scala | 11 ++++++++
.../org/apache/spark/deploy/master/Master.scala | 29 +++++++++++---------
2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/acd208ee/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ba7a65f..de2f475 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -543,6 +543,17 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}
+ if (contains("spark.cores.max") && contains("spark.executor.cores")) {
+ val totalCores = getInt("spark.cores.max", 1)
+ val executorCores = getInt("spark.executor.cores", 1)
+ val leftCores = totalCores % executorCores
+ if (leftCores != 0) {
+ logWarning(s"Total executor cores: ${totalCores} is not " +
+ s"divisible by cores per executor: ${executorCores}, " +
+ s"the left cores: ${leftCores} will not be allocated")
+ }
+ }
+
val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
http://git-wip-us.apache.org/repos/asf/spark/blob/acd208ee/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c192a0c..0dee25f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -659,19 +659,22 @@ private[deploy] class Master(
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
- for (app <- waitingApps if app.coresLeft > 0) {
- val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
- // Filter out workers that don't have enough resources to launch an executor
- val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
- worker.coresFree >= coresPerExecutor.getOrElse(1))
- .sortBy(_.coresFree).reverse
- val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
-
- // Now that we've decided how many cores to allocate on each worker, let's allocate them
- for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
- allocateWorkerResourceToExecutors(
- app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
+ for (app <- waitingApps) {
+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+ // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
+ if (app.coresLeft >= coresPerExecutor) {
+ // Filter out workers that don't have enough resources to launch an executor
+ val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+ .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+ worker.coresFree >= coresPerExecutor)
+ .sortBy(_.coresFree).reverse
+ val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
+
+ // Now that we've decided how many cores to allocate on each worker, let's allocate them
+ for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
+ allocateWorkerResourceToExecutors(
+ app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org