You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/05 02:43:57 UTC
spark git commit: [SPARK-25300][CORE] Unified the configuration
parameter `spark.shuffle.service.enabled`
Repository: spark
Updated Branches:
refs/heads/master 103f51323 -> ca861fea2
[SPARK-25300][CORE] Unified the configuration parameter `spark.shuffle.service.enabled`
## What changes were proposed in this pull request?
The configuration parameter "spark.shuffle.service.enabled" has defined in `package.scala`, and it is also used in many place, so we can replace it with `SHUFFLE_SERVICE_ENABLED`.
and unified this configuration parameter "spark.shuffle.service.port" together.
## How was this patch tested?
N/A
Closes #22306 from 10110346/unifiedserviceenable.
Authored-by: liuxian <li...@zte.com.cn>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca861fea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca861fea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca861fea
Branch: refs/heads/master
Commit: ca861fea21adc4e6ec95eced7076cb27fc86ea18
Parents: 103f513
Author: liuxian <li...@zte.com.cn>
Authored: Wed Sep 5 10:43:46 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 5 10:43:46 2018 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/ExecutorAllocationManager.scala | 4 ++--
.../org/apache/spark/deploy/ExternalShuffleService.scala | 8 ++++----
.../scala/org/apache/spark/deploy/LocalSparkCluster.scala | 4 ++--
.../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++--
.../scala/org/apache/spark/internal/config/package.scala | 3 +++
.../main/scala/org/apache/spark/storage/BlockManager.scala | 7 ++++---
core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++--
.../org/apache/spark/ExecutorAllocationManagerSuite.scala | 3 ++-
.../scala/org/apache/spark/ExternalShuffleServiceSuite.scala | 5 +++--
.../spark/deploy/StandaloneDynamicAllocationSuite.scala | 2 +-
.../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++--
.../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 ++--
.../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 ++--
.../mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 2 +-
.../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala | 6 +++---
15 files changed, 36 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 17b8863..c3e5b96 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.codahale.metrics.{Gauge, MetricRegistry}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
@@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager(
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
- if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
+ if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index b59a4fe..f6b3c37 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.TransportContext
import org.apache.spark.network.crypto.AuthServerBootstrap
@@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
- private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
- private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
+ private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
+ private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
@@ -131,7 +131,7 @@ object ExternalShuffleService extends Logging {
// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
- sparkConf.set("spark.shuffle.service.enabled", "true")
+ sparkConf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
server = newShuffleService(sparkConf, securityManager)
server.start()
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 84aa894..be293f8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils
@@ -52,7 +52,7 @@ class LocalSparkCluster(
// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone()
.setIfMissing("spark.master.rest.enabled", "false")
- .set("spark.shuffle.service.enabled", "false")
+ .set(config.SHUFFLE_SERVICE_ENABLED.key, "false")
/* Start the Master */
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cbd812a..d5ea252 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -36,7 +36,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -773,7 +773,7 @@ private[deploy] object Worker extends Logging {
// bound, we may launch no more than one external shuffle service on each host.
// When this happens, we should give explicit reason of failure instead of fail silently. For
// more detail see SPARK-20989.
- val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+ val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
"Starting multiple workers on one host is failed because we may launch no more than one " +
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7c2f601..319e664 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -144,6 +144,9 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
+ private[spark] val SHUFFLE_SERVICE_PORT =
+ ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)
+
private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
.doc("Location of user's keytab.")
.stringConf.createOptional
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e7cdfab..f5c69ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -130,7 +130,7 @@ private[spark] class BlockManager(
extends BlockDataManager with BlockEvictionHandler with Logging {
private[spark] val externalShuffleServiceEnabled =
- conf.getBoolean("spark.shuffle.service.enabled", false)
+ conf.get(config.SHUFFLE_SERVICE_ENABLED)
private val chunkSize =
conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
private val remoteReadNioBufferConversion =
@@ -165,12 +165,13 @@ private[spark] class BlockManager(
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
- val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+ val tmpPort = Utils.getSparkOrYarnConfig(conf, config.SHUFFLE_SERVICE_PORT.key,
+ config.SHUFFLE_SERVICE_PORT.defaultValueString).toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
- conf.get("spark.shuffle.service.port").toInt
+ conf.get(config.SHUFFLE_SERVICE_PORT.key).toInt
} else {
tmpPort
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 935bff9..15c958d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,7 +60,7 @@ import org.slf4j.Logger
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
@@ -822,7 +822,7 @@ private[spark] object Utils extends Logging {
* logic of locating the local directories according to deployment mode.
*/
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
- val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+ val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 659ebb6..5c718cb 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -24,6 +24,7 @@ import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -1092,7 +1093,7 @@ class ExecutorAllocationManagerSuite
val maxExecutors = 2
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
- .set("spark.shuffle.service.enabled", "true")
+ .set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 472952a..462d5f5 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.internal.config
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.TransportServer
@@ -42,8 +43,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
server = transportContext.createServer()
conf.set("spark.shuffle.manager", "sort")
- conf.set("spark.shuffle.service.enabled", "true")
- conf.set("spark.shuffle.service.port", server.getPort.toString)
+ conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+ conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString)
}
override def afterAll() {
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 27cc474..a1d2a12 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -458,7 +458,7 @@ class StandaloneDynamicAllocationSuite
val initialExecutorLimit = 1
val myConf = appConf
.set("spark.dynamicAllocation.enabled", "true")
- .set("spark.shuffle.service.enabled", "true")
+ .set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
.set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
sc = new SparkContext(myConf)
val appId = sc.applicationId
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index cd00051..e0202fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.internal.config
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
@@ -406,7 +407,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
- conf.set("spark.shuffle.service.enabled", "true")
+ conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
@@ -728,7 +729,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
- conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
+ conf.set(config.SHUFFLE_SERVICE_ENABLED.key, shuffleServiceOn.toString)
init(conf)
assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 08172f0..dbee1f6 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1377,8 +1377,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
newShuffleServer, conf, "ShuffleServer")
- conf.set("spark.shuffle.service.enabled", "true")
- conf.set("spark.shuffle.service.port", shufflePort.toString)
+ conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+ conf.set(SHUFFLE_SERVICE_PORT.key, shufflePort.toString)
conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
var e = intercept[SparkException] {
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 1ce2f81..178de30 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -102,7 +102,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// If shuffle service is enabled, the Spark driver will register with the shuffle service.
// This is for cleaning up shuffle files reliably.
- private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+ private val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
// Cores we have acquired with each Mesos task ID
private val coresByTaskId = new mutable.HashMap[String, Int]
@@ -624,7 +624,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
"External shuffle client was not instantiated even though shuffle service is enabled.")
// TODO: Remove this and allow the MesosExternalShuffleService to detect
// framework termination when new Mesos Framework HTTP API is available.
- val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
+ val externalShufflePort = conf.get(config.SHUFFLE_SERVICE_PORT)
logDebug(s"Connecting to shuffle service on slave $slaveId, " +
s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index b790c7c..da33d85 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -262,7 +262,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("mesos doesn't register twice with the same shuffle service") {
- setBackend(Map("spark.shuffle.service.enabled" -> "true"))
+ setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer1 = createOffer("o1", "s1", mem, cpu)
http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 01db796..37bccaf 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -44,7 +44,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
- yarnConfig.set("spark.shuffle.service.port", "0")
+ yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig
}
@@ -54,8 +54,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
logInfo("Shuffle service port = " + shuffleServicePort)
Map(
- "spark.shuffle.service.enabled" -> "true",
- "spark.shuffle.service.port" -> shuffleServicePort.toString,
+ SHUFFLE_SERVICE_ENABLED.key -> "true",
+ SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString,
MAX_EXECUTOR_FAILURES.key -> "1"
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org