You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/05 02:43:57 UTC

spark git commit: [SPARK-25300][CORE] Unified the configuration parameter `spark.shuffle.service.enabled`

Repository: spark
Updated Branches:
  refs/heads/master 103f51323 -> ca861fea2


[SPARK-25300][CORE] Unified the configuration parameter `spark.shuffle.service.enabled`

## What changes were proposed in this pull request?

The configuration parameter "spark.shuffle.service.enabled"  has defined in `package.scala`,  and it  is also used in many place,  so we can replace it with `SHUFFLE_SERVICE_ENABLED`.
and unified  this configuration parameter "spark.shuffle.service.port"  together.

## How was this patch tested?
N/A

Closes #22306 from 10110346/unifiedserviceenable.

Authored-by: liuxian <li...@zte.com.cn>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca861fea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca861fea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca861fea

Branch: refs/heads/master
Commit: ca861fea21adc4e6ec95eced7076cb27fc86ea18
Parents: 103f513
Author: liuxian <li...@zte.com.cn>
Authored: Wed Sep 5 10:43:46 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 5 10:43:46 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/ExecutorAllocationManager.scala   | 4 ++--
 .../org/apache/spark/deploy/ExternalShuffleService.scala     | 8 ++++----
 .../scala/org/apache/spark/deploy/LocalSparkCluster.scala    | 4 ++--
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala   | 4 ++--
 .../scala/org/apache/spark/internal/config/package.scala     | 3 +++
 .../main/scala/org/apache/spark/storage/BlockManager.scala   | 7 ++++---
 core/src/main/scala/org/apache/spark/util/Utils.scala        | 4 ++--
 .../org/apache/spark/ExecutorAllocationManagerSuite.scala    | 3 ++-
 .../scala/org/apache/spark/ExternalShuffleServiceSuite.scala | 5 +++--
 .../spark/deploy/StandaloneDynamicAllocationSuite.scala      | 2 +-
 .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++--
 .../scala/org/apache/spark/storage/BlockManagerSuite.scala   | 4 ++--
 .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala   | 4 ++--
 .../mesos/MesosCoarseGrainedSchedulerBackendSuite.scala      | 2 +-
 .../spark/deploy/yarn/YarnShuffleIntegrationSuite.scala      | 6 +++---
 15 files changed, 36 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 17b8863..c3e5b96 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
@@ -212,7 +212,7 @@ private[spark] class ExecutorAllocationManager(
     }
     // Require external shuffle service for dynamic allocation
     // Otherwise, we may lose shuffle files when killing executors
-    if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
+    if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
       throw new SparkException("Dynamic allocation of executors requires the external " +
         "shuffle service. You may enable this through spark.shuffle.service.enabled.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index b59a4fe..f6b3c37 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.crypto.AuthServerBootstrap
@@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
   protected val masterMetricsSystem =
     MetricsSystem.createMetricsSystem("shuffleService", sparkConf, securityManager)
 
-  private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
-  private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
+  private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
+  private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
 
   private val transportConf =
     SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
@@ -131,7 +131,7 @@ object ExternalShuffleService extends Logging {
 
     // we override this value since this service is started from the command line
     // and we assume the user really wants it to be running
-    sparkConf.set("spark.shuffle.service.enabled", "true")
+    sparkConf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
     server = newShuffleService(sparkConf, securityManager)
     server.start()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 84aa894..be293f8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.Utils
 
@@ -52,7 +52,7 @@ class LocalSparkCluster(
     // Disable REST server on Master in this mode unless otherwise specified
     val _conf = conf.clone()
       .setIfMissing("spark.master.rest.enabled", "false")
-      .set("spark.shuffle.service.enabled", "false")
+      .set(config.SHUFFLE_SERVICE_ENABLED.key, "false")
 
     /* Start the Master */
     val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cbd812a..d5ea252 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -36,7 +36,7 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.ExternalShuffleService
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -773,7 +773,7 @@ private[deploy] object Worker extends Logging {
     // bound, we may launch no more than one external shuffle service on each host.
     // When this happens, we should give explicit reason of failure instead of fail silently. For
     // more detail see SPARK-20989.
-    val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+    val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
     val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
     require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
       "Starting multiple workers on one host is failed because we may launch no more than one " +

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7c2f601..319e664 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -144,6 +144,9 @@ package object config {
   private[spark] val SHUFFLE_SERVICE_ENABLED =
     ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_PORT =
+    ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)
+
   private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
     .doc("Location of user's keytab.")
     .stringConf.createOptional

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e7cdfab..f5c69ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -130,7 +130,7 @@ private[spark] class BlockManager(
   extends BlockDataManager with BlockEvictionHandler with Logging {
 
   private[spark] val externalShuffleServiceEnabled =
-    conf.getBoolean("spark.shuffle.service.enabled", false)
+    conf.get(config.SHUFFLE_SERVICE_ENABLED)
   private val chunkSize =
     conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt
   private val remoteReadNioBufferConversion =
@@ -165,12 +165,13 @@ private[spark] class BlockManager(
   // Port used by the external shuffle service. In Yarn mode, this may be already be
   // set through the Hadoop configuration as the server is launched in the Yarn NM.
   private val externalShuffleServicePort = {
-    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+    val tmpPort = Utils.getSparkOrYarnConfig(conf, config.SHUFFLE_SERVICE_PORT.key,
+      config.SHUFFLE_SERVICE_PORT.defaultValueString).toInt
     if (tmpPort == 0) {
       // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
       // an open port.  But we still need to tell our spark apps the right port to use.  So
       // only if the yarn config has the port set to 0, we prefer the value in the spark config
-      conf.get("spark.shuffle.service.port").toInt
+      conf.get(config.SHUFFLE_SERVICE_PORT.key).toInt
     } else {
       tmpPort
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 935bff9..15c958d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,7 +60,7 @@ import org.slf4j.Logger
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.JavaUtils
@@ -822,7 +822,7 @@ private[spark] object Utils extends Logging {
    * logic of locating the local directories according to deployment mode.
    */
   def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
-    val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+    val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
     if (isRunningInYarnContainer(conf)) {
       // If we are in yarn mode, systems can have different disk layouts so we must set it
       // to what Yarn on this system said was available. Note this assumes that Yarn has

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 659ebb6..5c718cb 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -24,6 +24,7 @@ import org.mockito.Mockito.{mock, never, verify, when}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ExternalClusterManager
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -1092,7 +1093,7 @@ class ExecutorAllocationManagerSuite
     val maxExecutors = 2
     val conf = new SparkConf()
       .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.shuffle.service.enabled", "true")
+      .set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
       .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
       .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
       .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 472952a..462d5f5 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.internal.config
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.server.TransportServer
@@ -42,8 +43,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
     server = transportContext.createServer()
 
     conf.set("spark.shuffle.manager", "sort")
-    conf.set("spark.shuffle.service.enabled", "true")
-    conf.set("spark.shuffle.service.port", server.getPort.toString)
+    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+    conf.set(config.SHUFFLE_SERVICE_PORT.key, server.getPort.toString)
   }
 
   override def afterAll() {

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 27cc474..a1d2a12 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -458,7 +458,7 @@ class StandaloneDynamicAllocationSuite
     val initialExecutorLimit = 1
     val myConf = appConf
       .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.shuffle.service.enabled", "true")
+      .set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
       .set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
     sc = new SparkContext(myConf)
     val appId = sc.applicationId

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index cd00051..e0202fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.internal.config
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
@@ -406,7 +407,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     // reset the test context with the right shuffle service config
     afterEach()
     val conf = new SparkConf()
-    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
     conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
     init(conf)
     runEvent(ExecutorAdded("exec-hostA1", "hostA"))
@@ -728,7 +729,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       // reset the test context with the right shuffle service config
       afterEach()
       val conf = new SparkConf()
-      conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
+      conf.set(config.SHUFFLE_SERVICE_ENABLED.key, shuffleServiceOn.toString)
       init(conf)
       assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 08172f0..dbee1f6 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1377,8 +1377,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
       newShuffleServer, conf, "ShuffleServer")
 
-    conf.set("spark.shuffle.service.enabled", "true")
-    conf.set("spark.shuffle.service.port", shufflePort.toString)
+    conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+    conf.set(SHUFFLE_SERVICE_PORT.key, shufflePort.toString)
     conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
     conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
     var e = intercept[SparkException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 1ce2f81..178de30 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -102,7 +102,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   // If shuffle service is enabled, the Spark driver will register with the shuffle service.
   // This is for cleaning up shuffle files reliably.
-  private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+  private val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
 
   // Cores we have acquired with each Mesos task ID
   private val coresByTaskId = new mutable.HashMap[String, Int]
@@ -624,7 +624,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           "External shuffle client was not instantiated even though shuffle service is enabled.")
         // TODO: Remove this and allow the MesosExternalShuffleService to detect
         // framework termination when new Mesos Framework HTTP API is available.
-        val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
+        val externalShufflePort = conf.get(config.SHUFFLE_SERVICE_PORT)
 
         logDebug(s"Connecting to shuffle service on slave $slaveId, " +
             s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}")

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index b790c7c..da33d85 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -262,7 +262,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   }
 
   test("mesos doesn't register twice with the same shuffle service") {
-    setBackend(Map("spark.shuffle.service.enabled" -> "true"))
+    setBackend(Map(SHUFFLE_SERVICE_ENABLED.key -> "true"))
     val (mem, cpu) = (backend.executorMemory(sc), 4)
 
     val offer1 = createOffer("o1", "s1", mem, cpu)

http://git-wip-us.apache.org/repos/asf/spark/blob/ca861fea/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 01db796..37bccaf 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -44,7 +44,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
     yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
     yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
       classOf[YarnShuffleService].getCanonicalName)
-    yarnConfig.set("spark.shuffle.service.port", "0")
+    yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
     yarnConfig
   }
 
@@ -54,8 +54,8 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
     logInfo("Shuffle service port = " + shuffleServicePort)
 
     Map(
-      "spark.shuffle.service.enabled" -> "true",
-      "spark.shuffle.service.port" -> shuffleServicePort.toString,
+      SHUFFLE_SERVICE_ENABLED.key -> "true",
+      SHUFFLE_SERVICE_PORT.key -> shuffleServicePort.toString,
       MAX_EXECUTOR_FAILURES.key -> "1"
     )
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org