You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/01/17 02:57:52 UTC

[spark] branch master updated: [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 38f0307  [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.
38f0307 is described below

commit 38f030725c561979ca98b2a6cc7ca6c02a1f80ed
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Wed Jan 16 20:57:21 2019 -0600

    [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded configs below to use `ConfigEntry`.
    
    * spark.kryo
    * spark.kryoserializer
    * spark.serializer
    * spark.jars
    * spark.files
    * spark.submit
    * spark.deploy
    * spark.worker
    
    This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties).
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #23532 from HeartSaVioR/SPARK-26466-v2.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../main/scala/org/apache/spark/SparkConf.scala    | 21 +++----
 .../main/scala/org/apache/spark/SparkContext.scala |  4 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  9 ++-
 .../main/scala/org/apache/spark/api/r/RUtils.scala |  3 +-
 .../apache/spark/deploy/FaultToleranceTest.scala   |  6 +-
 .../org/apache/spark/deploy/SparkCuratorUtil.scala |  3 +-
 .../org/apache/spark/deploy/SparkSubmit.scala      | 29 +++++----
 .../org/apache/spark/deploy/master/Master.scala    | 48 +++++++--------
 .../spark/deploy/master/RecoveryModeFactory.scala  |  7 ++-
 .../master/ZooKeeperLeaderElectionAgent.scala      |  5 +-
 .../deploy/master/ZooKeeperPersistenceEngine.scala | 15 ++---
 .../apache/spark/deploy/worker/DriverRunner.scala  |  6 +-
 .../org/apache/spark/deploy/worker/Worker.scala    | 20 +++----
 .../spark/deploy/worker/WorkerArguments.scala      |  5 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala       |  2 -
 .../org/apache/spark/internal/config/Deploy.scala  | 68 ++++++++++++++++++++++
 .../org/apache/spark/internal/config/Kryo.scala    | 57 ++++++++++++++++++
 .../org/apache/spark/internal/config/Worker.scala  | 63 ++++++++++++++++++++
 .../org/apache/spark/internal/config/package.scala | 31 ++++++++++
 .../apache/spark/serializer/JavaSerializer.scala   |  5 +-
 .../apache/spark/serializer/KryoSerializer.scala   | 29 ++++-----
 .../main/scala/org/apache/spark/util/Utils.scala   | 12 ++--
 .../org/apache/spark/JobCancellationSuite.scala    |  3 +-
 .../test/scala/org/apache/spark/ShuffleSuite.scala |  3 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    | 31 +++++-----
 .../spark/api/python/PythonBroadcastSuite.scala    |  3 +-
 .../apache/spark/broadcast/BroadcastSuite.scala    |  3 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 34 +++++------
 .../apache/spark/deploy/master/MasterSuite.scala   | 14 ++---
 .../deploy/master/PersistenceEngineSuite.scala     |  3 +-
 .../deploy/rest/SubmitRestProtocolSuite.scala      |  3 +-
 .../apache/spark/deploy/worker/WorkerSuite.scala   |  9 +--
 .../apache/spark/scheduler/MapStatusSuite.scala    |  2 +-
 .../serializer/GenericAvroSerializerSuite.scala    |  3 +-
 .../apache/spark/serializer/KryoBenchmark.scala    |  8 ++-
 .../spark/serializer/KryoSerializerBenchmark.scala |  8 ++-
 .../KryoSerializerDistributedSuite.scala           |  4 +-
 .../KryoSerializerResizableOutputSuite.scala       | 14 +++--
 .../spark/serializer/KryoSerializerSuite.scala     | 44 +++++++-------
 .../serializer/SerializerPropertiesSuite.scala     |  3 +-
 .../serializer/UnsafeKryoSerializerSuite.scala     |  6 +-
 .../spark/storage/FlatmapIteratorSuite.scala       |  4 +-
 .../scala/org/apache/spark/util/UtilsSuite.scala   |  3 +-
 .../collection/ExternalAppendOnlyMapSuite.scala    |  4 +-
 .../util/collection/ExternalSorterSuite.scala      |  7 ++-
 .../apache/spark/ml/attribute/AttributeSuite.scala |  3 +-
 .../apache/spark/ml/feature/InstanceSuite.scala    |  3 +-
 .../spark/ml/feature/LabeledPointSuite.scala       |  3 +-
 .../apache/spark/ml/tree/impl/TreePointSuite.scala |  3 +-
 .../spark/mllib/clustering/KMeansSuite.scala       |  3 +-
 .../apache/spark/mllib/feature/Word2VecSuite.scala | 19 ++++--
 .../apache/spark/mllib/linalg/MatricesSuite.scala  |  3 +-
 .../apache/spark/mllib/linalg/VectorsSuite.scala   |  3 +-
 .../spark/mllib/regression/LabeledPointSuite.scala |  3 +-
 .../distribution/MultivariateGaussianSuite.scala   |  3 +-
 .../k8s/features/BasicDriverFeatureStep.scala      | 11 ++--
 .../k8s/features/DriverCommandFeatureStep.scala    | 13 +++--
 .../k8s/features/BasicDriverFeatureStepSuite.scala |  6 +-
 .../integrationtest/KubernetesTestComponents.scala |  3 +-
 .../deploy/mesos/MesosClusterDispatcher.scala      |  1 +
 .../org/apache/spark/deploy/mesos/config.scala     | 12 ----
 .../mesos/MesosClusterPersistenceEngine.scala      | 11 ++--
 .../cluster/mesos/MesosClusterScheduler.scala      | 18 +++---
 .../org/apache/spark/deploy/yarn/Client.scala      |  6 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala |  3 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  2 +-
 .../sql/DatasetSerializerRegistratorSuite.scala    |  3 +-
 ...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala |  4 +-
 .../sql/execution/joins/HashedRelationSuite.scala  |  5 +-
 69 files changed, 528 insertions(+), 280 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 22bcb81..b596be0 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -123,7 +124,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
   /** Set JAR files to distribute to the cluster. */
   def setJars(jars: Seq[String]): SparkConf = {
     for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
-    set("spark.jars", jars.filter(_ != null).mkString(","))
+    set(JARS, jars.filter(_ != null))
   }
 
   /** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@@ -201,12 +202,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
    */
   def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
     val allClassNames = new LinkedHashSet[String]()
-    allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').map(_.trim)
+    allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim)
       .filter(!_.isEmpty)
     allClassNames ++= classes.map(_.getName)
 
-    set("spark.kryo.classesToRegister", allClassNames.mkString(","))
-    set("spark.serializer", classOf[KryoSerializer].getName)
+    set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq)
+    set(SERIALIZER, classOf[KryoSerializer].getName)
     this
   }
 
@@ -547,20 +548,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
         case "yarn-cluster" =>
           logWarning(warning)
           set("spark.master", "yarn")
-          set("spark.submit.deployMode", "cluster")
+          set(SUBMIT_DEPLOY_MODE, "cluster")
         case "yarn-client" =>
           logWarning(warning)
           set("spark.master", "yarn")
-          set("spark.submit.deployMode", "client")
+          set(SUBMIT_DEPLOY_MODE, "client")
         case _ => // Any other unexpected master will be checked when creating scheduler backend.
       }
     }
 
-    if (contains("spark.submit.deployMode")) {
-      get("spark.submit.deployMode") match {
+    if (contains(SUBMIT_DEPLOY_MODE)) {
+      get(SUBMIT_DEPLOY_MODE) match {
         case "cluster" | "client" =>
-        case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
-          "\"client\".")
+        case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " +
+          "\"cluster\" or \"client\".")
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3bbf9f3..c9afc79 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging {
   def jars: Seq[String] = _jars
   def files: Seq[String] = _files
   def master: String = _conf.get("spark.master")
-  def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
+  def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
   def appName: String = _conf.get("spark.app.name")
 
   private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
@@ -2640,7 +2640,7 @@ object SparkContext extends Logging {
       case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
       case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
       case "yarn" =>
-        if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
+        if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
           conf.getInt(DRIVER_CORES.key, 0)
         } else {
           0
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ba5ed8a..4d7542c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -274,14 +274,13 @@ object SparkEnv extends Logging {
       }
     }
 
-    // Create an instance of the class named by the given SparkConf property, or defaultClassName
+    // Create an instance of the class named by the given SparkConf property
     // if the property is not set, possibly initializing it with our conf
-    def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
-      instantiateClass[T](conf.get(propertyName, defaultClassName))
+    def instantiateClassFromConf[T](propertyName: ConfigEntry[String]): T = {
+      instantiateClass[T](conf.get(propertyName))
     }
 
-    val serializer = instantiateClassFromConf[Serializer](
-      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
     logDebug(s"Using serializer: ${serializer.getClass}")
 
     val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 9bf35af..6832223 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -23,6 +23,7 @@ import java.util.Arrays
 import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.internal.config._
 
 private[spark] object RUtils {
   // Local path where R binary packages built from R source code contained in the spark
@@ -63,7 +64,7 @@ private[spark] object RUtils {
         (sys.props("spark.master"), sys.props("spark.submit.deployMode"))
       } else {
         val sparkConf = SparkEnv.get.conf
-        (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode", "client"))
+        (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
       }
 
     val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0679bdf..a662430 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -60,7 +60,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
 private object FaultToleranceTest extends App with Logging {
 
   private val conf = new SparkConf()
-  private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
+  private val zkDir = conf.get(config.Deploy.ZOOKEEPER_DIRECTORY).getOrElse("/spark")
 
   private val masters = ListBuffer[TestMasterInfo]()
   private val workers = ListBuffer[TestWorkerInfo]()
@@ -87,8 +87,8 @@ private object FaultToleranceTest extends App with Logging {
     terminateCluster()
 
     // Clear ZK directories in between tests (for speed purposes)
-    SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
-    SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
+    SparkCuratorUtil.deleteRecursive(zk, zkDir + "/spark_leader")
+    SparkCuratorUtil.deleteRecursive(zk, zkDir + "/master_status")
   }
 
   test("sanity-basic") {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
index 8247110..8118c01 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
@@ -25,6 +25,7 @@ import org.apache.zookeeper.KeeperException
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
 
 private[spark] object SparkCuratorUtil extends Logging {
 
@@ -35,7 +36,7 @@ private[spark] object SparkCuratorUtil extends Logging {
 
   def newClient(
       conf: SparkConf,
-      zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
+      zkUrlConf: String = ZOOKEEPER_URL.key): CuratorFramework = {
     val ZK_URL = conf.get(zkUrlConf)
     val zk = CuratorFrameworkFactory.newClient(ZK_URL,
       ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 57a8bdf..b403cc4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -437,7 +437,7 @@ private[spark] class SparkSubmit extends Logging {
     }
 
     if (localPyFiles != null) {
-      sparkConf.set("spark.submit.pyFiles", localPyFiles)
+      sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq)
     }
 
     // In YARN mode for an R app, add the SparkR package archive and the R package
@@ -614,11 +614,11 @@ private[spark] class SparkSubmit extends Logging {
     // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
     // For python and R files, the primary resource is already distributed as a regular file
     if (!isYarnCluster && !args.isPython && !args.isR) {
-      var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
+      var jars = sparkConf.get(JARS)
       if (isUserJar(args.primaryResource)) {
         jars = jars ++ Seq(args.primaryResource)
       }
-      sparkConf.set("spark.jars", jars.mkString(","))
+      sparkConf.set(JARS, jars)
     }
 
     // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
@@ -681,7 +681,7 @@ private[spark] class SparkSubmit extends Logging {
         // Second argument is main class
         childArgs += (args.primaryResource, "")
         if (args.pyFiles != null) {
-          sparkConf.set("spark.submit.pyFiles", args.pyFiles)
+          sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq)
         }
       } else if (args.isR) {
         // Second argument is main class
@@ -748,18 +748,17 @@ private[spark] class SparkSubmit extends Logging {
     // Resolve and format python file paths properly before adding them to the PYTHONPATH.
     // The resolving part is redundant in the case of --py-files, but necessary if the user
     // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
-    sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
-      val resolvedPyFiles = Utils.resolveURIs(pyFiles)
-      val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
-        PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
-      } else {
-        // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
-        // support dealing with remote python files, they could distribute and add python files
-        // locally.
-        resolvedPyFiles
-      }
-      sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
+    val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
+    val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
+    val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
+      PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+    } else {
+      // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
+      // support dealing with remote python files, they could distribute and add python files
+      // locally.
+      resolvedPyFiles
     }
+    sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)
 
     (childArgs, childClasspath, sparkConf, childMainClass)
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 32f6d1f..b26da8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -34,7 +34,9 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.deploy.rest.StandaloneRestServer
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.serializer.{JavaSerializer, Serializer}
@@ -56,12 +58,12 @@ private[deploy] class Master(
   // For application IDs
   private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
 
-  private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
-  private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
-  private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
-  private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
-  private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
-  private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
+  private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000
+  private val retainedApplications = conf.get(RETAINED_APPLICATIONS)
+  private val retainedDrivers = conf.get(RETAINED_DRIVERS)
+  private val reaperIterations = conf.get(REAPER_ITERATIONS)
+  private val recoveryMode = conf.get(RECOVERY_MODE)
+  private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES)
 
   val workers = new HashSet[WorkerInfo]
   val idToApp = new HashMap[String, ApplicationInfo]
@@ -113,13 +115,13 @@ private[deploy] class Master(
   // As a temporary workaround before better ways of configuring memory, we allow users to set
   // a flag that will perform round-robin scheduling across the nodes (spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
-  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+  private val spreadOutApps = conf.get(SPREAD_OUT_APPS)
 
   // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
-  private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+  private val defaultCores = conf.get(DEFAULT_CORES)
   val reverseProxy = conf.get(UI_REVERSE_PROXY)
   if (defaultCores < 1) {
-    throw new SparkException("spark.deploy.defaultCores must be positive")
+    throw new SparkException(s"${DEFAULT_CORES.key} must be positive")
   }
 
   // Alternative application submission gateway that is stable across Spark versions
@@ -151,7 +153,7 @@ private[deploy] class Master(
       override def run(): Unit = Utils.tryLogNonFatalError {
         self.send(CheckForWorkerTimeOut)
       }
-    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+    }, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
 
     if (restServerEnabled) {
       val port = conf.get(MASTER_REST_SERVER_PORT)
@@ -168,7 +170,7 @@ private[deploy] class Master(
     applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
 
     val serializer = new JavaSerializer(conf)
-    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
+    val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match {
       case "ZOOKEEPER" =>
         logInfo("Persisting recovery state to ZooKeeper")
         val zkFactory =
@@ -179,7 +181,7 @@ private[deploy] class Master(
           new FileSystemRecoveryModeFactory(conf, serializer)
         (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
       case "CUSTOM" =>
-        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
+        val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY))
         val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
           .newInstance(conf, serializer)
           .asInstanceOf[StandaloneRecoveryModeFactory]
@@ -233,7 +235,7 @@ private[deploy] class Master(
           override def run(): Unit = Utils.tryLogNonFatalError {
             self.send(CompleteRecovery)
           }
-        }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+        }, workerTimeoutMs, TimeUnit.MILLISECONDS)
       }
 
     case CompleteRecovery => completeRecovery()
@@ -311,8 +313,8 @@ private[deploy] class Master(
             // Important note: this code path is not exercised by tests, so be very careful when
             // changing this `if` condition.
             if (!normalExit
-                && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
-                && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
+                && appInfo.incrementRetryCount() >= maxExecutorRetries
+                && maxExecutorRetries >= 0) { // < 0 disables this application-killing path
               val execs = appInfo.executors.values
               if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                 logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
@@ -870,8 +872,8 @@ private[deploy] class Master(
       endpointToApp -= app.driver
       addressToApp -= app.driver.address
 
-      if (completedApps.size >= RETAINED_APPLICATIONS) {
-        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+      if (completedApps.size >= retainedApplications) {
+        val toRemove = math.max(retainedApplications / 10, 1)
         completedApps.take(toRemove).foreach { a =>
           applicationMetricsSystem.removeSource(a.appSource)
         }
@@ -989,14 +991,14 @@ private[deploy] class Master(
   private def timeOutDeadWorkers() {
     // Copy the workers into an array so we don't modify the hashset while iterating through it
     val currentTime = System.currentTimeMillis()
-    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
+    val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
     for (worker <- toRemove) {
       if (worker.state != WorkerState.DEAD) {
         logWarning("Removing %s because we got no heartbeat in %d seconds".format(
-          worker.id, WORKER_TIMEOUT_MS / 1000))
-        removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
+          worker.id, workerTimeoutMs / 1000))
+        removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 1000} seconds")
       } else {
-        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
+        if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
           workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
         }
       }
@@ -1031,8 +1033,8 @@ private[deploy] class Master(
       case Some(driver) =>
         logInfo(s"Removing driver: $driverId")
         drivers -= driver
-        if (completedDrivers.size >= RETAINED_DRIVERS) {
-          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
+        if (completedDrivers.size >= retainedDrivers) {
+          val toRemove = math.max(retainedDrivers / 10, 1)
           completedDrivers.trimStart(toRemove)
         }
         completedDrivers += driver
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index ffdd635..4707987 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY
 import org.apache.spark.serializer.Serializer
 
 /**
@@ -52,11 +53,11 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serial
 private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
   extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
 
-  val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)
 
   def createPersistenceEngine(): PersistenceEngine = {
-    logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
-    new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
+    logInfo("Persisting recovery state to directory: " + recoveryDir)
+    new FileSystemPersistenceEngine(recoveryDir, serializer)
   }
 
   def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 1e8dabf..47f3091 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -23,11 +23,12 @@ import org.apache.curator.framework.recipes.leader.{LeaderLatch, LeaderLatchList
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkCuratorUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy.ZOOKEEPER_DIRECTORY
 
 private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
     conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {
 
-  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+  val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/leader_election"
 
   private var zk: CuratorFramework = _
   private var leaderLatch: LeaderLatch = _
@@ -38,7 +39,7 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderEle
   private def start() {
     logInfo("Starting ZooKeeper LeaderElection agent")
     zk = SparkCuratorUtil.newClient(conf)
-    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
+    leaderLatch = new LeaderLatch(zk, workingDir)
     leaderLatch.addListener(this)
     leaderLatch.start()
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index af850e4..73dd0de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -28,6 +28,7 @@ import org.apache.zookeeper.CreateMode
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkCuratorUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.serializer.Serializer
 
 
@@ -35,22 +36,22 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
   extends PersistenceEngine
   with Logging {
 
-  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+  private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status"
   private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
 
-  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+  SparkCuratorUtil.mkdir(zk, workingDir)
 
 
   override def persist(name: String, obj: Object): Unit = {
-    serializeIntoFile(WORKING_DIR + "/" + name, obj)
+    serializeIntoFile(workingDir + "/" + name, obj)
   }
 
   override def unpersist(name: String): Unit = {
-    zk.delete().forPath(WORKING_DIR + "/" + name)
+    zk.delete().forPath(workingDir + "/" + name)
   }
 
   override def read[T: ClassTag](prefix: String): Seq[T] = {
-    zk.getChildren.forPath(WORKING_DIR).asScala
+    zk.getChildren.forPath(workingDir).asScala
       .filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
   }
 
@@ -66,13 +67,13 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
   }
 
   private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
-    val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
+    val fileData = zk.getData().forPath(workingDir + "/" + filename)
     try {
       Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))
     } catch {
       case e: Exception =>
         logWarning("Exception while reading persisted file, deleting", e)
-        zk.delete().forPath(WORKING_DIR + "/" + filename)
+        zk.delete().forPath(workingDir + "/" + filename)
         None
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index a6d13d1..8c2a907 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages.DriverStateChanged
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}
 
@@ -57,8 +58,7 @@ private[deploy] class DriverRunner(
   @volatile private[worker] var finalException: Option[Exception] = None
 
   // Timeout to wait for when trying to terminate a driver.
-  private val DRIVER_TERMINATE_TIMEOUT_MS =
-    conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s")
+  private val driverTerminateTimeoutMs = conf.get(WORKER_DRIVER_TERMINATE_TIMEOUT)
 
   // Decoupled for testing
   def setClock(_clock: Clock): Unit = {
@@ -122,7 +122,7 @@ private[deploy] class DriverRunner(
     killed = true
     synchronized {
       process.foreach { p =>
-        val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS)
+        val exitCode = Utils.terminateProcess(p, driverTerminateTimeoutMs)
         if (exitCode.isEmpty) {
           logWarning("Failed to terminate driver process: " + p +
               ". This process will likely be orphaned.")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8c3593c..115450b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -74,7 +75,7 @@ private[deploy] class Worker(
   // For worker and executor IDs
   private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
+  private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4
 
   // Model retries to connect to the master, after Hadoop's model.
   // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
@@ -93,13 +94,11 @@ private[deploy] class Worker(
   private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
     * REGISTRATION_RETRY_FUZZ_MULTIPLIER))
 
-  private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
+  private val CLEANUP_ENABLED = conf.get(WORKER_CLEANUP_ENABLED)
   // How often worker will clean up old app folders
-  private val CLEANUP_INTERVAL_MILLIS =
-    conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
+  private val CLEANUP_INTERVAL_MILLIS = conf.get(WORKER_CLEANUP_INTERVAL) * 1000
   // TTL for app folders/data;  after TTL expires it will be cleaned up
-  private val APP_DATA_RETENTION_SECONDS =
-    conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
+  private val APP_DATA_RETENTION_SECONDS = conf.get(APP_DATA_RETENTION)
 
   // Whether or not cleanup the non-shuffle files on executor exits.
   private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
@@ -111,8 +110,7 @@ private[deploy] class Worker(
    * Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
    * will just use the address received from Master.
    */
-  private val preferConfiguredMasterAddress =
-    conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
+  private val preferConfiguredMasterAddress = conf.get(PREFER_CONFIGURED_MASTER_ADDRESS)
   /**
    * The master address to connect in case of failure. When the connection is broken, worker will
    * use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
@@ -143,10 +141,8 @@ private[deploy] class Worker(
   val appDirectories = new HashMap[String, Seq[String]]
   val finishedApps = new HashSet[String]
 
-  val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
-    WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
-  val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
-    WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
+  val retainedExecutors = conf.get(WORKER_UI_RETAINED_EXECUTORS)
+  val retainedDrivers = conf.get(WORKER_UI_RETAINED_DRIVERS)
 
   // The shuffle service is not actually started unless configured.
   private val shuffleService = if (externalShuffleServiceSupplier != null) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 5802812..8c87708 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory
 import scala.annotation.tailrec
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.Worker._
 import org.apache.spark.util.{IntParam, MemoryParam, Utils}
 
 /**
@@ -59,9 +60,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
   // This mutates the SparkConf, so all accesses to it must be made after this line
   propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
 
-  if (conf.contains("spark.worker.ui.port")) {
-    webUiPort = conf.get("spark.worker.ui.port").toInt
-  }
+  conf.get(WORKER_UI_PORT).foreach { webUiPort = _ }
 
   checkWorkerMemory()
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 5488695..96980c3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -56,6 +56,4 @@ class WorkerWebUI(
 
 private[worker] object WorkerWebUI {
   val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
-  val DEFAULT_RETAINED_DRIVERS = 1000
-  val DEFAULT_RETAINED_EXECUTORS = 1000
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
new file mode 100644
index 0000000..ceab957
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Deploy {
+  val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
+    .stringConf
+    .createWithDefault("NONE")
+
+  val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory")
+    .stringConf
+    .createWithDefault("")
+
+  val RECOVERY_DIRECTORY = ConfigBuilder("spark.deploy.recoveryDirectory")
+    .stringConf
+    .createWithDefault("")
+
+  val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url")
+    .doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " +
+      "configuration is used to set the zookeeper URL to connect to.")
+    .stringConf
+    .createOptional
+
+  val ZOOKEEPER_DIRECTORY = ConfigBuilder("spark.deploy.zookeeper.dir")
+    .stringConf
+    .createOptional
+
+  val RETAINED_APPLICATIONS = ConfigBuilder("spark.deploy.retainedApplications")
+    .intConf
+    .createWithDefault(200)
+
+  val RETAINED_DRIVERS = ConfigBuilder("spark.deploy.retainedDrivers")
+    .intConf
+    .createWithDefault(200)
+
+  val REAPER_ITERATIONS = ConfigBuilder("spark.dead.worker.persistence")
+    .intConf
+    .createWithDefault(15)
+
+  val MAX_EXECUTOR_RETRIES = ConfigBuilder("spark.deploy.maxExecutorRetries")
+    .intConf
+    .createWithDefault(10)
+
+  val SPREAD_OUT_APPS = ConfigBuilder("spark.deploy.spreadOut")
+    .booleanConf
+    .createWithDefault(true)
+
+  val DEFAULT_CORES = ConfigBuilder("spark.deploy.defaultCores")
+    .intConf
+    .createWithDefault(Int.MaxValue)
+
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
new file mode 100644
index 0000000..7873141
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Kryo {
+
+  val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")
+    .booleanConf
+    .createWithDefault(false)
+
+  val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
+    .stringConf
+    .createOptional
+
+  val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
+    .booleanConf
+    .createWithDefault(false)
+
+  val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
+    .booleanConf
+    .createWithDefault(true)
+
+  val KRYO_REFERENCE_TRACKING = ConfigBuilder("spark.kryo.referenceTracking")
+    .booleanConf
+    .createWithDefault(true)
+
+  val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")
+    .bytesConf(ByteUnit.KiB)
+    .createWithDefaultString("64k")
+
+  val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")
+    .bytesConf(ByteUnit.MiB)
+    .createWithDefaultString("64m")
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
new file mode 100644
index 0000000..47f7167
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+private[spark] object Worker {
+  val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout")
+    .longConf
+    .createWithDefault(60)
+
+  val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefaultString("10s")
+
+  val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled")
+    .booleanConf
+    .createWithDefault(false)
+
+  val WORKER_CLEANUP_INTERVAL = ConfigBuilder("spark.worker.cleanup.interval")
+    .longConf
+    .createWithDefault(60 * 30)
+
+  val APP_DATA_RETENTION = ConfigBuilder("spark.worker.cleanup.appDataTtl")
+    .longConf
+    .createWithDefault(7 * 24 * 3600)
+
+  val PREFER_CONFIGURED_MASTER_ADDRESS = ConfigBuilder("spark.worker.preferConfiguredMasterAddress")
+    .booleanConf
+    .createWithDefault(false)
+
+  val WORKER_UI_PORT = ConfigBuilder("spark.worker.ui.port")
+    .intConf
+    .createOptional
+
+  val WORKER_UI_RETAINED_EXECUTORS = ConfigBuilder("spark.worker.ui.retainedExecutors")
+    .intConf
+    .createWithDefault(1000)
+
+  val WORKER_UI_RETAINED_DRIVERS = ConfigBuilder("spark.worker.ui.retainedDrivers")
+    .intConf
+    .createWithDefault(1000)
+
+  val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
+    ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
+    .intConf
+    .createWithDefault(100)
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0e78637..99ce220 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -961,4 +961,35 @@ package object config {
       .intConf
       .createWithDefault(4)
 
+  private[spark] val SERIALIZER = ConfigBuilder("spark.serializer")
+    .stringConf
+    .createWithDefault("org.apache.spark.serializer.JavaSerializer")
+
+  private[spark] val SERIALIZER_OBJECT_STREAM_RESET =
+    ConfigBuilder("spark.serializer.objectStreamReset")
+      .intConf
+      .createWithDefault(100)
+
+  private[spark] val SERIALIZER_EXTRA_DEBUG_INFO = ConfigBuilder("spark.serializer.extraDebugInfo")
+    .booleanConf
+    .createWithDefault(true)
+
+  private[spark] val JARS = ConfigBuilder("spark.jars")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val FILES = ConfigBuilder("spark.files")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
+
+  private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode")
+    .stringConf
+    .createWithDefault("client")
+
+  private[spark] val SUBMIT_PYTHON_FILES = ConfigBuilder("spark.submit.pyFiles")
+    .stringConf
+    .toSequence
+    .createWithDefault(Nil)
 }
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index f60dcfd..70564ee 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.config._
 import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
 
 private[spark] class JavaSerializationStream(
@@ -137,8 +138,8 @@ private[spark] class JavaSerializerInstance(
  */
 @DeveloperApi
 class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
-  private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
-  private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)
+  private var counterReset = conf.get(SERIALIZER_OBJECT_STREAM_RESET)
+  private var extraDebugInfo = conf.get(SERIALIZER_EXTRA_DEBUG_INFO)
 
   protected def this() = this(new SparkConf())  // For deserialization only
 
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 72ca0fb..2df133d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -39,6 +39,7 @@ import org.roaringbitmap.RoaringBitmap
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
 import org.apache.spark.storage._
@@ -58,34 +59,34 @@ class KryoSerializer(conf: SparkConf)
   with Logging
   with Serializable {
 
-  private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
+  private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
 
   if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
-    throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
+    throw new IllegalArgumentException(s"${KRYO_SERIALIZER_BUFFER_SIZE.key} must be less than " +
       s"2048 MiB, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} MiB.")
   }
   private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt
 
-  val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
+  val maxBufferSizeMb = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE).toInt
   if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
-    throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
-      s"2048 MiB, got: + $maxBufferSizeMb MiB.")
+    throw new IllegalArgumentException(s"${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} must be less " +
+      s"than 2048 MiB, got: $maxBufferSizeMb MiB.")
   }
   private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt
 
-  private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
-  private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
-  private val userRegistrators = conf.get("spark.kryo.registrator", "")
-    .split(',').map(_.trim)
+  private val referenceTracking = conf.get(KRYO_REFERENCE_TRACKING)
+  private val registrationRequired = conf.get(KRYO_REGISTRATION_REQUIRED)
+  private val userRegistrators = conf.get(KRYO_USER_REGISTRATORS)
+    .map(_.trim)
     .filter(!_.isEmpty)
-  private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
-    .split(',').map(_.trim)
+  private val classesToRegister = conf.get(KRYO_CLASSES_TO_REGISTER)
+    .map(_.trim)
     .filter(!_.isEmpty)
 
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
-  private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
-  private val usePool = conf.getBoolean("spark.kryo.pool", true)
+  private val useUnsafe = conf.get(KRYO_USE_UNSAFE)
+  private val usePool = conf.get(KRYO_USE_POOL)
 
   def newKryoOutput(): KryoOutput =
     if (useUnsafe) {
@@ -407,7 +408,7 @@ private[spark] class KryoSerializerInstance(
     } catch {
       case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
         throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
-          "increase spark.kryoserializer.buffer.max value.", e)
+          s"increase ${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} value.", e)
     } finally {
       releaseKryo(kryo)
     }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 83d1b2b..7416559 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -62,6 +62,7 @@ import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -1457,16 +1458,12 @@ private[spark] object Utils extends Logging {
     CallSite(shortForm, longForm)
   }
 
-  private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
-    "spark.worker.ui.compressedLogFileLengthCacheSize"
-  private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
   private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null
   private def getCompressedLogFileLengthCache(
       sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized {
     if (compressedLogFileLengthCache == null) {
-      val compressedLogFileLengthCacheSize = sparkConf.getInt(
-        UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
-        DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
+      val compressedLogFileLengthCacheSize = sparkConf.get(
+        UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
       compressedLogFileLengthCache = CacheBuilder.newBuilder()
         .maximumSize(compressedLogFileLengthCacheSize)
         .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
@@ -2535,8 +2532,7 @@ private[spark] object Utils extends Logging {
    * has its own mechanism to distribute jars.
    */
   def getUserJars(conf: SparkConf): Seq[String] = {
-    val sparkJars = conf.getOption("spark.jars")
-    sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+    conf.get(JARS).filter(_.nonEmpty)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 61da413..f8adaf5 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 
+import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
 import org.apache.spark.util.ThreadUtils
 
@@ -256,7 +257,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
       .set("spark.task.reaper.enabled", "true")
       .set("spark.task.reaper.killTimeout", "-1")
       .set("spark.task.reaper.PollingInterval", "1s")
-      .set("spark.deploy.maxExecutorRetries", "1")
+      .set(MAX_EXECUTOR_RETRIES, 1)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
     // Add a listener to release the semaphore once any tasks are launched.
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index ffa7042..3203f8f 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
 import org.scalatest.Matchers
 
 import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
+import org.apache.spark.internal.config.SERIALIZER
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
@@ -215,7 +216,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
 
   test("sort with Java non serializable class - Kryo") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    val myConf = conf.clone().set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index a8849ab..4071dd4 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo
 
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@@ -78,7 +79,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     assert(conf.get("spark.master") === "local[3]")
     assert(conf.get("spark.app.name") === "My app")
     assert(conf.get("spark.home") === "/path")
-    assert(conf.get("spark.jars") === "a.jar,b.jar")
+    assert(conf.get(JARS) === Seq("a.jar", "b.jar"))
     assert(conf.get("spark.executorEnv.VAR1") === "value1")
     assert(conf.get("spark.executorEnv.VAR2") === "value2")
     assert(conf.get("spark.executorEnv.VAR3") === "value3")
@@ -86,7 +87,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     // Test the Java-friendly versions of these too
     conf.setJars(Array("c.jar", "d.jar"))
     conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5")))
-    assert(conf.get("spark.jars") === "c.jar,d.jar")
+    assert(conf.get(JARS) === Seq("c.jar", "d.jar"))
     assert(conf.get("spark.executorEnv.VAR4") === "value4")
     assert(conf.get("spark.executorEnv.VAR5") === "value5")
   }
@@ -182,19 +183,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
   }
 
   test("register kryo classes through registerKryoClasses") {
-    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+    val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
 
     conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
-    assert(conf.get("spark.kryo.classesToRegister") ===
-      classOf[Class1].getName + "," + classOf[Class2].getName)
+    assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
+      Seq(classOf[Class1].getName, classOf[Class2].getName).toSet)
 
     conf.registerKryoClasses(Array(classOf[Class3]))
-    assert(conf.get("spark.kryo.classesToRegister") ===
-      classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
+    assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
+      Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet)
 
     conf.registerKryoClasses(Array(classOf[Class2]))
-    assert(conf.get("spark.kryo.classesToRegister") ===
-      classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
+    assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
+      Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet)
 
     // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
     // blow up.
@@ -205,12 +206,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
   }
 
   test("register kryo classes through registerKryoClasses and custom registrator") {
-    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+    val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
 
     conf.registerKryoClasses(Array(classOf[Class1]))
-    assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)
+    assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)
 
-    conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)
+    conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
 
     // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
     // blow up.
@@ -220,9 +221,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
   }
 
   test("register kryo classes through conf") {
-    val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
-    conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
-    conf.set("spark.serializer", classOf[KryoSerializer].getName)
+    val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
+    conf.set(KRYO_CLASSES_TO_REGISTER, Seq("java.lang.StringBuffer"))
+    conf.set(SERIALIZER, classOf[KryoSerializer].getName)
 
     // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
     // blow up.
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
index 7407a65..24004de 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
@@ -24,6 +24,7 @@ import scala.io.Source
 import org.scalatest.Matchers
 
 import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -48,7 +49,7 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
       }
       val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)
       assertBroadcastIsValid(broadcast)
-      val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+      val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
       val deserializedBroadcast =
         Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance())
       assertBroadcastIsValid(deserializedBroadcast)
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 6d74812..18ec60d 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.Assertions
 
 import org.apache.spark._
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.SERIALIZER
 import org.apache.spark.io.SnappyCompressionCodec
 import org.apache.spark.rdd.RDD
 import org.apache.spark.security.EncryptionFunSuite
@@ -68,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
 
   encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { conf =>
     val numSlaves = 4
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
     conf.set(config.BROADCAST_COMPRESS, true)
     sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
     val list = List[Int](1, 2, 3, 4)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index c6e961e..30efbb0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -221,7 +221,7 @@ class SparkSubmitSuite
     val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
 
     appArgs.deployMode should be ("client")
-    conf.get("spark.submit.deployMode") should be ("client")
+    conf.get(SUBMIT_DEPLOY_MODE) should be ("client")
 
     // Both cmd line and configuration are specified, cmdline option takes the priority
     val clArgs1 = Seq(
@@ -235,7 +235,7 @@ class SparkSubmitSuite
     val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
 
     appArgs1.deployMode should be ("cluster")
-    conf1.get("spark.submit.deployMode") should be ("cluster")
+    conf1.get(SUBMIT_DEPLOY_MODE) should be ("cluster")
 
     // Neither cmdline nor configuration are specified, client mode is the default choice
     val clArgs2 = Seq(
@@ -248,7 +248,7 @@ class SparkSubmitSuite
 
     val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     appArgs2.deployMode should be ("client")
-    conf2.get("spark.submit.deployMode") should be ("client")
+    conf2.get(SUBMIT_DEPLOY_MODE) should be ("client")
   }
 
   test("handles YARN cluster mode") {
@@ -374,12 +374,12 @@ class SparkSubmitSuite
     val confMap = conf.getAll.toMap
     confMap.keys should contain ("spark.master")
     confMap.keys should contain ("spark.app.name")
-    confMap.keys should contain ("spark.jars")
+    confMap.keys should contain (JARS.key)
     confMap.keys should contain ("spark.driver.memory")
     confMap.keys should contain ("spark.driver.cores")
     confMap.keys should contain ("spark.driver.supervise")
     confMap.keys should contain (UI_ENABLED.key)
-    confMap.keys should contain ("spark.submit.deployMode")
+    confMap.keys should contain (SUBMIT_DEPLOY_MODE.key)
     conf.get(UI_ENABLED) should be (false)
   }
 
@@ -467,7 +467,7 @@ class SparkSubmitSuite
     val (_, _, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
     conf.get("spark.executor.memory") should be ("5g")
     conf.get("spark.master") should be ("yarn")
-    conf.get("spark.submit.deployMode") should be ("cluster")
+    conf.get(SUBMIT_DEPLOY_MODE) should be ("cluster")
     mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS)
   }
 
@@ -662,7 +662,7 @@ class SparkSubmitSuite
       val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
       appArgs.jars should be(Utils.resolveURIs(jars))
       appArgs.files should be(Utils.resolveURIs(files))
-      conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+      conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq)
       conf.get("spark.files") should be(Utils.resolveURIs(files))
 
       // Test files and archives (Yarn)
@@ -692,8 +692,8 @@ class SparkSubmitSuite
       val appArgs3 = new SparkSubmitArguments(clArgs3)
       val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
       appArgs3.pyFiles should be(Utils.resolveURIs(pyFiles))
-      conf3.get("spark.submit.pyFiles") should be(
-        PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+      conf3.get(SUBMIT_PYTHON_FILES) should be(
+        PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)))
       conf3.get(PYSPARK_DRIVER_PYTHON.key) should be("python3.4")
       conf3.get(PYSPARK_PYTHON.key) should be("python3.5")
     }
@@ -744,8 +744,8 @@ class SparkSubmitSuite
       )
       val appArgs = new SparkSubmitArguments(clArgs)
       val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
-      conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
-      conf.get("spark.files") should be(Utils.resolveURIs(files))
+      conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq)
+      conf.get(FILES) should be(Utils.resolveURIs(files).split(",").toSeq)
 
       // Test files and archives (Yarn)
       val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
@@ -776,8 +776,8 @@ class SparkSubmitSuite
       )
       val appArgs3 = new SparkSubmitArguments(clArgs3)
       val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
-      conf3.get("spark.submit.pyFiles") should be(
-        PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+      conf3.get(SUBMIT_PYTHON_FILES) should be(
+        PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)))
 
       // Test remote python files
       val hadoopConf = new Configuration()
@@ -798,7 +798,7 @@ class SparkSubmitSuite
       val appArgs4 = new SparkSubmitArguments(clArgs4)
       val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
       // Should not format python path for yarn cluster mode
-      conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
+      conf4.get(SUBMIT_PYTHON_FILES) should be(Utils.resolveURIs(remotePyFiles).split(","))
     }
   }
 
@@ -1024,7 +1024,7 @@ class SparkSubmitSuite
       conf.get("spark.repl.local.jars") should (startWith("file:"))
 
       // local py files should not be a URI format.
-      conf.get("spark.submit.pyFiles") should (startWith("/"))
+      conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
     }
   }
 
@@ -1155,7 +1155,7 @@ class SparkSubmitSuite
       val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
 
       conf.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
-      conf.get("spark.submit.pyFiles") should (startWith("/"))
+      conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
 
       // Verify "spark.submit.pyFiles"
       val args1 = Seq(
@@ -1171,7 +1171,7 @@ class SparkSubmitSuite
       val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
 
       conf1.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
-      conf1.get("spark.submit.pyFiles") should (startWith("/"))
+      conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 5904d03..fbf2acc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -40,7 +40,9 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy._
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.serializer
 
@@ -103,9 +105,8 @@ class MasterSuite extends SparkFunSuite
 
   test("can use a custom recovery mode factory") {
     val conf = new SparkConf(loadDefaults = false)
-    conf.set("spark.deploy.recoveryMode", "CUSTOM")
-    conf.set("spark.deploy.recoveryMode.factory",
-      classOf[CustomRecoveryModeFactory].getCanonicalName)
+    conf.set(RECOVERY_MODE, "CUSTOM")
+    conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
     conf.set(MASTER_REST_SERVER_ENABLED, false)
 
     val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
@@ -188,9 +189,8 @@ class MasterSuite extends SparkFunSuite
 
   test("master correctly recover the application") {
     val conf = new SparkConf(loadDefaults = false)
-    conf.set("spark.deploy.recoveryMode", "CUSTOM")
-    conf.set("spark.deploy.recoveryMode.factory",
-      classOf[FakeRecoveryModeFactory].getCanonicalName)
+    conf.set(RECOVERY_MODE, "CUSTOM")
+    conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
     conf.set(MASTER_REST_SERVER_ENABLED, false)
 
     val fakeAppInfo = makeAppInfo(1024)
@@ -637,7 +637,7 @@ class MasterSuite extends SparkFunSuite
   }
 
   test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
-    val conf = new SparkConf().set("spark.worker.timeout", "1")
+    val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
     val master = makeMaster(conf)
     master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
     eventually(timeout(10.seconds)) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 3027865..3d8a46b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomUtils
 import org.apache.curator.test.TestingServer
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
 import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
 import org.apache.spark.serializer.{JavaSerializer, Serializer}
 import org.apache.spark.util.Utils
@@ -48,7 +49,7 @@ class PersistenceEngineSuite extends SparkFunSuite {
     val zkTestServer = new TestingServer(findFreePort(conf))
     try {
       testPersistenceEngine(conf, serializer => {
-        conf.set("spark.deploy.zookeeper.url", zkTestServer.getConnectString)
+        conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString)
         new ZooKeeperPersistenceEngine(conf, serializer)
       })
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
index 75c50af..87655f3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -22,6 +22,7 @@ import java.lang.Boolean
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -93,7 +94,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
     message.sparkProperties = conf.getAll.toMap
     message.validate()
     // optional fields
-    conf.set("spark.jars", "mayonnaise.jar,ketchup.jar")
+    conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar"))
     conf.set("spark.files", "fireball.png")
     conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
     conf.set("spark.driver.cores", "180")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index e5e5b5e..0ddf38c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
 import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
 import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.internal.config.Worker._
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 
 class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
@@ -100,7 +101,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   test("test clearing of finishedExecutors (small number of executors)") {
     val conf = new SparkConf()
-    conf.set("spark.worker.ui.retainedExecutors", 2.toString)
+    conf.set(WORKER_UI_RETAINED_EXECUTORS, 2)
     val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
@@ -124,7 +125,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   test("test clearing of finishedExecutors (more executors)") {
     val conf = new SparkConf()
-    conf.set("spark.worker.ui.retainedExecutors", 30.toString)
+    conf.set(WORKER_UI_RETAINED_EXECUTORS, 30)
     val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
@@ -157,7 +158,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   test("test clearing of finishedDrivers (small number of drivers)") {
     val conf = new SparkConf()
-    conf.set("spark.worker.ui.retainedDrivers", 2.toString)
+    conf.set(WORKER_UI_RETAINED_DRIVERS, 2)
     val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
@@ -181,7 +182,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   test("test clearing of finishedDrivers (more drivers)") {
     val conf = new SparkConf()
-    conf.set("spark.worker.ui.retainedDrivers", 30.toString)
+    conf.set(WORKER_UI_RETAINED_DRIVERS, 30)
     val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index f41ffb7..c1e7fb9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -181,7 +181,7 @@ class MapStatusSuite extends SparkFunSuite {
 
   test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") {
     val conf = new SparkConf()
-      .set("spark.serializer", classOf[KryoSerializer].getName)
+      .set(config.SERIALIZER, classOf[KryoSerializer].getName)
       .setMaster("local")
       .setAppName("SPARK-21133")
     withSpark(new SparkContext(conf)) { sc =>
diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
index 3734f1c..8610b18 100644
--- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
@@ -25,9 +25,10 @@ import org.apache.avro.{Schema, SchemaBuilder}
 import org.apache.avro.generic.GenericData.Record
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
+import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
-  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
   val schema : Schema = SchemaBuilder
     .record("testRecord").fields()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
index d7730f2..fd228cd 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -22,6 +22,8 @@ import scala.util.Random
 
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoTest._
 
 /**
@@ -122,9 +124,9 @@ object KryoBenchmark extends BenchmarkBase {
 
   def createSerializer(useUnsafe: Boolean): SerializerInstance = {
     val conf = new SparkConf()
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
-    conf.set("spark.kryo.unsafe", useUnsafe.toString)
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+    conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+    conf.set(KRYO_USE_UNSAFE, useUnsafe)
 
     new KryoSerializer(conf).newInstance()
   }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
index 2a15c6f..2915b99 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
@@ -23,6 +23,8 @@ import scala.concurrent.duration._
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoTest._
 import org.apache.spark.util.ThreadUtils
 
@@ -69,9 +71,9 @@ object KryoSerializerBenchmark extends BenchmarkBase {
 
   def createSparkContext(usePool: Boolean): SparkContext = {
     val conf = new SparkConf()
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
-    conf.set("spark.kryo.pool", usePool.toString)
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+    conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+    conf.set(KRYO_USE_POOL, usePool)
 
     if (sc != null) {
       sc.stop()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index 46aa9c3..ae87109 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -28,8 +28,8 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
 
   test("kryo objects are serialised consistently in different processes") {
     val conf = new SparkConf(false)
-      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
+      .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+      .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
       .set(config.MAX_TASK_FAILURES, 1)
       .set(config.BLACKLIST_ENABLED, false)
 
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
index cf01f79..25f0b19 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
@@ -21,6 +21,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkException
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
 
 class KryoSerializerResizableOutputSuite extends SparkFunSuite {
 
@@ -29,9 +31,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite {
 
   test("kryo without resizable output buffer should fail on large array") {
     val conf = new SparkConf(false)
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer", "1m")
-    conf.set("spark.kryoserializer.buffer.max", "1m")
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+    conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
+    conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "1m")
     withSpark(new SparkContext("local", "test", conf)) { sc =>
       intercept[SparkException](sc.parallelize(x).collect())
     }
@@ -39,9 +41,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite {
 
   test("kryo with resizable output buffer should succeed on large array") {
     val conf = new SparkConf(false)
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.kryoserializer.buffer", "1m")
-    conf.set("spark.kryoserializer.buffer.max", "2m")
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+    conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
+    conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "2m")
     withSpark(new SparkContext("local", "test", conf)) { sc =>
       assert(sc.parallelize(x).collect() === x)
     }
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 8af5327..41fb405 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -32,19 +32,21 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.scheduler.HighlyCompressedMapStatus
 import org.apache.spark.serializer.KryoTest._
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
-  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-  conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
-  conf.set("spark.kryo.unsafe", "false")
+  conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+  conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+  conf.set(KRYO_USE_UNSAFE, false)
 
   test("SPARK-7392 configuration limits") {
-    val kryoBufferProperty = "spark.kryoserializer.buffer"
-    val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
+    val kryoBufferProperty = KRYO_SERIALIZER_BUFFER_SIZE.key
+    val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
 
     def newKryoInstance(
         conf: SparkConf,
@@ -81,7 +83,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 
   test("basic types") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
     def check[T: ClassTag](t: T) {
@@ -114,7 +116,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 
   test("pairs") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
     def check[T: ClassTag](t: T) {
@@ -141,7 +143,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 
   test("Scala data structures") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
     def check[T: ClassTag](t: T) {
@@ -169,7 +171,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   }
 
   test("Bug: SPARK-10251") {
-    val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true"))
+    val ser = new KryoSerializer(conf.clone.set(KRYO_REGISTRATION_REQUIRED, true))
       .newInstance()
     def check[T: ClassTag](t: T) {
       assert(ser.deserialize[T](ser.serialize(t)) === t)
@@ -253,7 +255,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     hashMap.put("foo", "bar")
     check(hashMap)
 
-    System.clearProperty("spark.kryo.registrator")
+    System.clearProperty(KRYO_USER_REGISTRATORS.key)
   }
 
   test("kryo with collect") {
@@ -310,7 +312,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     import org.apache.spark.SparkException
 
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrator", "this.class.does.not.exist")
+    conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
 
     val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
     assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
@@ -337,7 +339,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 
   test("registration of HighlyCompressedMapStatus") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     // these cases require knowing the internals of RoaringBitmap a little.  Blocks span 2^16
     // values, and they use a bitmap (dense) if they have more than 4096 values, and an
@@ -355,7 +357,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 
   test("serialization buffer overflow reporting") {
     import org.apache.spark.SparkException
-    val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
+    val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
 
     val largeObject = (1 to 1000000).toArray
 
@@ -409,7 +411,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   test("getAutoReset") {
     val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
     assert(ser.getAutoReset)
-    val conf = new SparkConf().set("spark.kryo.registrator",
+    val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
       classOf[RegistratorWithoutAutoReset].getName)
     val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
     assert(!ser2.getAutoReset)
@@ -438,10 +440,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
   private def testSerializerInstanceReuse(
       autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): Unit = {
     val conf = new SparkConf(loadDefaults = false)
-      .set("spark.kryo.referenceTracking", referenceTracking.toString)
-      .set("spark.kryo.pool", usePool.toString)
+      .set(KRYO_REFERENCE_TRACKING, referenceTracking)
+      .set(KRYO_USE_POOL, usePool)
     if (!autoReset) {
-      conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
+      conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
     }
     val ser = new KryoSerializer(conf)
     val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
@@ -478,7 +480,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
       Executors.newFixedThreadPool(4))
 
-    val ser = new KryoSerializer(conf.clone.set("spark.kryo.pool", "true"))
+    val ser = new KryoSerializer(conf.clone.set(KRYO_USE_POOL, true))
 
     val tests = mutable.ListBuffer[Future[Boolean]]()
 
@@ -519,9 +521,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
 }
 
 class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
-  conf.set("spark.serializer", classOf[KryoSerializer].getName)
-  conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
-  conf.set("spark.kryo.referenceTracking", "true")
+  conf.set(SERIALIZER, classOf[KryoSerializer].getName)
+  conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
+  conf.set(KRYO_REFERENCE_TRACKING, true)
   conf.set("spark.shuffle.manager", "sort")
   conf.set("spark.shuffle.sort.bypassMergeThreshold", "200")
 
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
index 99882bf..dad080c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
 import org.scalatest.Assertions
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
 
 /**
@@ -50,7 +51,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {
   }
 
   test("KryoSerializer does not support relocation when auto-reset is disabled") {
-    val conf = new SparkConf().set("spark.kryo.registrator",
+    val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
       classOf[RegistratorWithoutAutoReset].getName)
     val ser = new KryoSerializer(conf)
     assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
index d63a45a..126ba0e 100644
--- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
@@ -17,17 +17,19 @@
 
 package org.apache.spark.serializer
 
+import org.apache.spark.internal.config.Kryo._
+
 class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
 
   // This test suite should run all tests in KryoSerializerSuite with kryo unsafe.
 
   override def beforeAll() {
-    conf.set("spark.kryo.unsafe", "true")
+    conf.set(KRYO_USE_UNSAFE, true)
     super.beforeAll()
   }
 
   override def afterAll() {
-    conf.set("spark.kryo.unsafe", "false")
+    conf.set(KRYO_USE_UNSAFE, false)
     super.afterAll()
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
index 4282850..fc16fe3 100644
--- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.storage
 
 import org.apache.spark._
-
+import org.apache.spark.internal.config._
 
 class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
   /* Tests the ability of Spark to deal with user provided iterators from flatMap
@@ -55,7 +55,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
 
   test("Serializer Reset") {
     val sconf = new SparkConf().setMaster("local").setAppName("serializer_reset_test")
-      .set("spark.serializer.objectStreamReset", "10")
+      .set(SERIALIZER_OBJECT_STREAM_RESET, 10)
     sc = new SparkContext(sconf)
     val expand_size = 500
     val data = sc.parallelize(Seq(1, 2)).
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index d3f94fb..7aca0ad 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.SparkListener
 
@@ -829,7 +830,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
   test("isDynamicAllocationEnabled") {
     val conf = new SparkConf()
     conf.set("spark.master", "yarn")
-    conf.set("spark.submit.deployMode", "client")
+    conf.set(SUBMIT_DEPLOY_MODE, "client")
     assert(Utils.isDynamicAllocationEnabled(conf) === false)
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.dynamicAllocation.enabled", "false")) === false)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 5efbf4e..de70153 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -54,8 +54,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     val conf = new SparkConf(loadDefaults)
     // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
     // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
+    conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
     conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
     conf.set("spark.shuffle.compress", codec.isDefined.toString)
     codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) }
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 14148e0..3006409 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
 import org.apache.spark._
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.TEST_MEMORY
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -268,12 +269,12 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
   private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
     val conf = new SparkConf(loadDefaults)
     if (kryo) {
-      conf.set("spark.serializer", classOf[KryoSerializer].getName)
+      conf.set(SERIALIZER, classOf[KryoSerializer].getName)
     } else {
       // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
       // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
-      conf.set("spark.serializer.objectStreamReset", "1")
-      conf.set("spark.serializer", classOf[JavaSerializer].getName)
+      conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
+      conf.set(SERIALIZER, classOf[JavaSerializer].getName)
     }
     conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
     // Ensure that we actually have multiple batches per spill file
diff --git a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala
index eb5f3ca..7f892fd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.ml.attribute
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.types._
 
@@ -225,7 +226,7 @@ class AttributeSuite extends SparkFunSuite {
 
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
index cca7399..5a74490 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
@@ -18,13 +18,14 @@
 package org.apache.spark.ml.feature
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.ml.linalg.Vectors
 import org.apache.spark.serializer.KryoSerializer
 
 class InstanceSuite extends SparkFunSuite{
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala
index 05c7a58..63c1635 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala
@@ -18,13 +18,14 @@
 package org.apache.spark.ml.feature
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.ml.linalg.Vectors
 import org.apache.spark.serializer.KryoSerializer
 
 class LabeledPointSuite extends SparkFunSuite {
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala
index f41abe4..3a44e79 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala
@@ -18,12 +18,13 @@
 package org.apache.spark.ml.tree.impl
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoSerializer
 
 class TreePointSuite extends SparkFunSuite {
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
index d18cef7..c4bf5b2 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
 import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
 import org.apache.spark.mllib.util.TestingUtils._
@@ -316,7 +317,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index f4fa216..a679fe4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -18,8 +18,10 @@
 package org.apache.spark.mllib.feature
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.util.Utils
 
 class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
@@ -109,12 +111,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   test("big model load / save") {
     // backupping old values
-    val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m")
-    val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k")
+    val oldBufferConfValue = spark.conf.get(KRYO_SERIALIZER_BUFFER_SIZE.key, "64m")
+    val oldBufferMaxConfValue = spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64k")
+    val oldSetCommandRejectsSparkCoreConfs = spark.conf.get(
+      SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "true")
 
     // setting test values to trigger partitioning
-    spark.conf.set("spark.kryoserializer.buffer", "50b")
-    spark.conf.set("spark.kryoserializer.buffer.max", "50b")
+
+    // this is needed to set configurations which are also defined to SparkConf
+    spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "false")
+    spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "50b")
 
     // create a model bigger than 50 Bytes
     val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*)
@@ -137,8 +143,9 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
         "that spans over multiple partitions", t)
     } finally {
       Utils.deleteRecursively(tempDir)
-      spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue)
-      spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue)
+      spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, oldBufferConfValue)
+      spark.conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, oldBufferMaxConfValue)
+      spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, oldSetCommandRejectsSparkCoreConfs)
     }
 
   }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
index 2c3f846..b4520d4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -27,6 +27,7 @@ import org.mockito.Mockito.when
 import org.scalatest.mockito.MockitoSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.serializer.KryoSerializer
@@ -34,7 +35,7 @@ import org.apache.spark.serializer.KryoSerializer
 class MatricesSuite extends SparkFunSuite {
   test("kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 217b4a3..fee0b02 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -25,6 +25,7 @@ import org.json4s.jackson.JsonMethods.{parse => parseJson}
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.serializer.KryoSerializer
@@ -38,7 +39,7 @@ class VectorsSuite extends SparkFunSuite with Logging {
 
   test("kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
     def check[T: ClassTag](t: T) {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
index c1449ec..d3366dc 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.regression
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.serializer.KryoSerializer
@@ -57,7 +58,7 @@ class LabeledPointSuite extends SparkFunSuite {
 
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala
index 5b4a260..4c88fd3 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.stat.distribution
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.mllib.linalg.{Matrices, Vectors}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
@@ -83,7 +84,7 @@ class MultivariateGaussianSuite extends SparkFunSuite with MLlibTestSparkContext
 
   test("Kryo class register") {
     val conf = new SparkConf(false)
-    conf.set("spark.kryo.registrationRequired", "true")
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
 
     val ser = new KryoSerializer(conf).newInstance()
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 39834fc..e664b64 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -154,12 +154,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
       KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
       MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
 
-    Seq("spark.jars", "spark.files").foreach { key =>
-      conf.getOption(key).foreach { value =>
-        val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value))
-        if (resolved.nonEmpty) {
-          additionalProps.put(key, resolved.mkString(","))
-        }
+    Seq(JARS, FILES).foreach { key =>
+      val value = conf.get(key)
+      val resolved = KubernetesUtils.resolveFileUrisAndPath(value)
+      if (resolved.nonEmpty) {
+        additionalProps.put(key.key, resolved.mkString(","))
       }
     }
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index 76b4ec9..bd3f8a1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -109,21 +109,22 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
   }
 
   private def additionalJavaProperties(resource: String): Map[String, String] = {
-    resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource))
+    resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource))
   }
 
   private def additionalPythonProperties(resource: String): Map[String, String] = {
     resourceType(APP_RESOURCE_TYPE_PYTHON) ++
-      mergeFileList("spark.files", Seq(resource) ++ conf.pyFiles)
+      mergeFileList(FILES, Seq(resource) ++ conf.pyFiles)
   }
 
   private def additionalRProperties(resource: String): Map[String, String] = {
-    resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource))
+    resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource))
   }
 
-  private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = {
-    val existing = Utils.stringToSeq(conf.get(key, ""))
-    Map(key -> (existing ++ filesToAdd).distinct.mkString(","))
+  private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: Seq[String])
+    : Map[String, String] = {
+    val existing = conf.get(key)
+    Map(key.key -> (existing ++ filesToAdd).distinct.mkString(","))
   }
 
   private def resourceType(resType: String): Map[String, String] = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 90255a5..ccf88cc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -143,7 +143,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val sparkConf = new SparkConf()
       .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
       .setJars(allJars)
-      .set("spark.files", allFiles.mkString(","))
+      .set(FILES, allFiles)
       .set(CONTAINER_IMAGE, "spark-driver:latest")
     val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
 
@@ -154,8 +154,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       "spark.app.id" -> KubernetesTestConf.APP_ID,
       KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix,
       "spark.kubernetes.submitInDriver" -> "true",
-      "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
-      "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
+      JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
+      FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
       MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(additionalProperties === expectedSparkConf)
   }
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index c869803..e539c8e 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.JARS
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI.UI_ENABLED
 
@@ -86,7 +87,7 @@ private[spark] class SparkAppConf {
 
   def get(key: String): String = map.getOrElse(key, "")
 
-  def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(","))
+  def setJars(jars: Seq[String]): Unit = set(JARS.key, jars.mkString(","))
 
   override def toString: String = map.toString
 
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 32ac4f3..bc1247a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.mesos.ui.MesosClusterUI
 import org.apache.spark.deploy.rest.mesos.MesosRestServer
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.scheduler.cluster.mesos._
 import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, SparkUncaughtExceptionHandler, Utils}
 
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index dd0b2ba..2b8655c 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -63,11 +63,6 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("30s")
 
-  private[spark] val RECOVERY_MODE =
-    ConfigBuilder("spark.deploy.recoveryMode")
-      .stringConf
-      .createWithDefault("NONE")
-
   private[spark] val DISPATCHER_WEBUI_URL =
     ConfigBuilder("spark.mesos.dispatcher.webui.url")
       .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " +
@@ -75,13 +70,6 @@ package object config {
       .stringConf
       .createOptional
 
-  private[spark] val ZOOKEEPER_URL =
-    ConfigBuilder("spark.deploy.zookeeper.url")
-      .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " +
-        "configuration is used to set the zookeeper URL to connect to.")
-      .stringConf
-      .createOptional
-
   private[spark] val HISTORY_SERVER_URL =
     ConfigBuilder("spark.mesos.dispatcher.historyServer.url")
       .doc("Set the URL of the history server. The dispatcher will then " +
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
index 61ab3e8..123412f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -26,6 +26,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkCuratorUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.util.Utils
 
 /**
@@ -94,13 +95,13 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
     zk: CuratorFramework,
     conf: SparkConf)
   extends MesosClusterPersistenceEngine with Logging {
-  private val WORKING_DIR =
-    conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
+  private val workingDir =
+    conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark_mesos_dispatcher") + "/" + baseDir
 
-  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+  SparkCuratorUtil.mkdir(zk, workingDir)
 
   def path(name: String): String = {
-    WORKING_DIR + "/" + name
+    workingDir + "/" + name
   }
 
   override def expunge(name: String): Unit = {
@@ -129,6 +130,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
   }
 
   override def fetchAll[T](): Iterable[T] = {
-    zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
+    zk.getChildren.forPath(workingDir).asScala.flatMap(fetch[T])
   }
 }
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 021b1ac..8c961a5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -32,7 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason
 import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
-import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, EXECUTOR_MEMORY}
+import org.apache.spark.internal.config._
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.Utils
 
@@ -432,7 +432,7 @@ private[spark] class MesosClusterScheduler(
   private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
     val confUris = List(conf.getOption("spark.mesos.uris"),
       desc.conf.getOption("spark.mesos.uris"),
-      desc.conf.getOption("spark.submit.pyFiles")).flatMap(
+      Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap(
       _.map(_.split(",").map(_.trim))
     ).flatten
 
@@ -534,16 +534,16 @@ private[spark] class MesosClusterScheduler(
     desc.conf.getOption(CORES_MAX.key).foreach { v =>
       options ++= Seq("--total-executor-cores", v)
     }
-    desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
-      val formattedFiles = pyFiles.split(",")
-        .map { path => new File(sandboxPath, path.split("/").last).toString() }
-        .mkString(",")
-      options ++= Seq("--py-files", formattedFiles)
-    }
+
+    val pyFiles = desc.conf.get(SUBMIT_PYTHON_FILES)
+    val formattedFiles = pyFiles.map { path =>
+      new File(sandboxPath, path.split("/").last).toString()
+    }.mkString(",")
+    options ++= Seq("--py-files", formattedFiles)
 
     // --conf
     val replicatedOptionsBlacklist = Set(
-      "spark.jars", // Avoids duplicate classes in classpath
+      JARS.key, // Avoids duplicate classes in classpath
       "spark.submit.deployMode", // this would be set to `cluster`, but we need client
       "spark.master" // this contains the address of the dispatcher, not master
     )
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8492180..7992292 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -68,7 +68,7 @@ private[spark] class Client(
   private val yarnClient = YarnClient.createYarnClient
   private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
 
-  private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
+  private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
 
   // AM related configurations
   private val amMemory = if (isClusterMode) {
@@ -1532,8 +1532,8 @@ private[spark] class YarnClusterApplication extends SparkApplication {
   override def start(args: Array[String], conf: SparkConf): Unit = {
     // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
     // so remove them from sparkConf here for yarn mode.
-    conf.remove("spark.jars")
-    conf.remove("spark.files")
+    conf.remove(JARS)
+    conf.remove(FILES)
 
     new Client(new ClientArguments(args), conf).run()
   }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 9acd995..25827fd 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -40,6 +40,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
 class ClientSuite extends SparkFunSuite with Matchers {
@@ -368,7 +369,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       val resources = Map("fpga" -> 2, "gpu" -> 3)
       ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
 
-      val conf = new SparkConf().set("spark.submit.deployMode", deployMode)
+      val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
       resources.foreach { case (name, v) =>
         conf.set(prefix + name, v.toString)
       }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index b7e83c8..faddb8f 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -443,7 +443,7 @@ private object YarnClusterDriver extends Logging with Matchers {
 
       // If we are running in yarn-cluster mode, verify that driver logs links and present and are
       // in the expected format.
-      if (conf.get("spark.submit.deployMode") == "cluster") {
+      if (conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
         assert(listener.driverLogs.nonEmpty)
         val driverLogs = listener.driverLogs.get
         assert(driverLogs.size === 2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
index 68f7de0..69728ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
@@ -21,6 +21,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer}
 import com.esotericsoftware.kryo.io.{Input, Output}
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -33,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSQLContext
 
   override protected def sparkConf: SparkConf = {
     // Make sure we use the KryoRegistrator
-    super.sparkConf.set("spark.kryo.registrator", TestRegistrator().getClass.getCanonicalName)
+    super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
   }
 
   test("Kryo registrator") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index e174dc6..0869e25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -45,8 +45,8 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
   private val conf = new SparkConf(false)
     // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
     // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
-    .set("spark.serializer.objectStreamReset", "1")
-    .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    .set(config.SERIALIZER_OBJECT_STREAM_RESET, 1)
+    .set(config.SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
 
   private def withFakeTaskContext(f: => Unit): Unit = {
     val sc = new SparkContext("local", "test", conf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 7b55e83..1c89910 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,7 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.InternalRow
@@ -309,7 +310,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("Spark-14521") {
     val ser = new KryoSerializer(
-      (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
+      (new SparkConf).set(KRYO_REFERENCE_TRACKING, false)).newInstance()
     val key = Seq(BoundReference(0, LongType, false))
 
     // Testing Kryo serialization of HashedRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org