You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/04 19:32:38 UTC
spark git commit: [SPARK-13162] Standalone mode does not respect
initial executors
Repository: spark
Updated Branches:
refs/heads/master 62a7c2838 -> 4120bcbaf
[SPARK-13162] Standalone mode does not respect initial executors
Currently the Master would always set an application's initial executor limit to infinity. If the user specified `spark.dynamicAllocation.initialExecutors`, the config would not take effect. This is similar to #11047 but for standalone mode.
Author: Andrew Or <an...@databricks.com>
Closes #11054 from andrewor14/standalone-da-initial.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4120bcba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4120bcba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4120bcba
Branch: refs/heads/master
Commit: 4120bcbaffe92da40486b469334119ed12199f4f
Parents: 62a7c28
Author: Andrew Or <an...@databricks.com>
Authored: Thu Feb 4 10:32:16 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Feb 4 10:32:32 2016 -0800
----------------------------------------------------------------------
.../apache/spark/ExecutorAllocationManager.scala | 2 ++
.../spark/deploy/ApplicationDescription.scala | 3 +++
.../spark/deploy/master/ApplicationInfo.scala | 2 +-
.../cluster/SparkDeploySchedulerBackend.scala | 16 ++++++++++++----
.../deploy/StandaloneDynamicAllocationSuite.scala | 17 ++++++++++++++++-
5 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4120bcba/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 3431fc1..db143d7 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -231,6 +231,8 @@ private[spark] class ExecutorAllocationManager(
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
+
+ client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/4120bcba/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 78bbd5c..c5c5c60 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -29,6 +29,9 @@ private[spark] case class ApplicationDescription(
// short name of compression codec used when writing event logs, if any (e.g. lzf)
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
+ // number of executors this application wants to start with,
+ // only used if dynamic allocation is enabled
+ initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
override def toString: String = "ApplicationDescription(" + name + ")"
http://git-wip-us.apache.org/repos/asf/spark/blob/4120bcba/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 7e2cf95..4ffb528 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -65,7 +65,7 @@ private[spark] class ApplicationInfo(
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
- executorLimit = Integer.MAX_VALUE
+ executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
appUIUrlAtHistoryServer = None
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4120bcba/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 16f3316..d209645 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,11 +19,11 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress}
+import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
@@ -89,8 +89,16 @@ private[spark] class SparkDeploySchedulerBackend(
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
- val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
- command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
+ // If we're using dynamic allocation, set our initial executor limit to 0 for now.
+ // ExecutorAllocationManager will send the real initial limit to the Master later.
+ val initialExecutorLimit =
+ if (Utils.isDynamicAllocationEnabled(conf)) {
+ Some(0)
+ } else {
+ None
+ }
+ val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
+ appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
http://git-wip-us.apache.org/repos/asf/spark/blob/4120bcba/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index fdada07..b7ff5c9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -447,7 +447,23 @@ class StandaloneDynamicAllocationSuite
apps = getApplications()
// kill executor successfully
assert(apps.head.executors.size === 1)
+ }
+ test("initial executor limit") {
+ val initialExecutorLimit = 1
+ val myConf = appConf
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.shuffle.service.enabled", "true")
+ .set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
+ sc = new SparkContext(myConf)
+ val appId = sc.applicationId
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === initialExecutorLimit)
+ assert(apps.head.getExecutorLimit === initialExecutorLimit)
+ }
}
// ===============================
@@ -540,7 +556,6 @@ class StandaloneDynamicAllocationSuite
val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
missingExecutors.foreach { id =>
// Fake an executor registration so the driver knows about us
- val port = System.currentTimeMillis % 65536
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org