You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/01/17 02:57:52 UTC
[spark] branch master updated: [SPARK-26466][CORE] Use ConfigEntry
for hardcoded configs for submit categories.
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 38f0307 [SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.
38f0307 is described below
commit 38f030725c561979ca98b2a6cc7ca6c02a1f80ed
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Wed Jan 16 20:57:21 2019 -0600
[SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit categories.
## What changes were proposed in this pull request?
The PR makes hardcoded configs below to use `ConfigEntry`.
* spark.kryo
* spark.kryoserializer
* spark.serializer
* spark.jars
* spark.files
* spark.submit
* spark.deploy
* spark.worker
This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties).
## How was this patch tested?
Existing tests.
Closes #23532 from HeartSaVioR/SPARK-26466-v2.
Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../main/scala/org/apache/spark/SparkConf.scala | 21 +++----
.../main/scala/org/apache/spark/SparkContext.scala | 4 +-
.../src/main/scala/org/apache/spark/SparkEnv.scala | 9 ++-
.../main/scala/org/apache/spark/api/r/RUtils.scala | 3 +-
.../apache/spark/deploy/FaultToleranceTest.scala | 6 +-
.../org/apache/spark/deploy/SparkCuratorUtil.scala | 3 +-
.../org/apache/spark/deploy/SparkSubmit.scala | 29 +++++----
.../org/apache/spark/deploy/master/Master.scala | 48 +++++++--------
.../spark/deploy/master/RecoveryModeFactory.scala | 7 ++-
.../master/ZooKeeperLeaderElectionAgent.scala | 5 +-
.../deploy/master/ZooKeeperPersistenceEngine.scala | 15 ++---
.../apache/spark/deploy/worker/DriverRunner.scala | 6 +-
.../org/apache/spark/deploy/worker/Worker.scala | 20 +++----
.../spark/deploy/worker/WorkerArguments.scala | 5 +-
.../spark/deploy/worker/ui/WorkerWebUI.scala | 2 -
.../org/apache/spark/internal/config/Deploy.scala | 68 ++++++++++++++++++++++
.../org/apache/spark/internal/config/Kryo.scala | 57 ++++++++++++++++++
.../org/apache/spark/internal/config/Worker.scala | 63 ++++++++++++++++++++
.../org/apache/spark/internal/config/package.scala | 31 ++++++++++
.../apache/spark/serializer/JavaSerializer.scala | 5 +-
.../apache/spark/serializer/KryoSerializer.scala | 29 ++++-----
.../main/scala/org/apache/spark/util/Utils.scala | 12 ++--
.../org/apache/spark/JobCancellationSuite.scala | 3 +-
.../test/scala/org/apache/spark/ShuffleSuite.scala | 3 +-
.../scala/org/apache/spark/SparkConfSuite.scala | 31 +++++-----
.../spark/api/python/PythonBroadcastSuite.scala | 3 +-
.../apache/spark/broadcast/BroadcastSuite.scala | 3 +-
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 34 +++++------
.../apache/spark/deploy/master/MasterSuite.scala | 14 ++---
.../deploy/master/PersistenceEngineSuite.scala | 3 +-
.../deploy/rest/SubmitRestProtocolSuite.scala | 3 +-
.../apache/spark/deploy/worker/WorkerSuite.scala | 9 +--
.../apache/spark/scheduler/MapStatusSuite.scala | 2 +-
.../serializer/GenericAvroSerializerSuite.scala | 3 +-
.../apache/spark/serializer/KryoBenchmark.scala | 8 ++-
.../spark/serializer/KryoSerializerBenchmark.scala | 8 ++-
.../KryoSerializerDistributedSuite.scala | 4 +-
.../KryoSerializerResizableOutputSuite.scala | 14 +++--
.../spark/serializer/KryoSerializerSuite.scala | 44 +++++++-------
.../serializer/SerializerPropertiesSuite.scala | 3 +-
.../serializer/UnsafeKryoSerializerSuite.scala | 6 +-
.../spark/storage/FlatmapIteratorSuite.scala | 4 +-
.../scala/org/apache/spark/util/UtilsSuite.scala | 3 +-
.../collection/ExternalAppendOnlyMapSuite.scala | 4 +-
.../util/collection/ExternalSorterSuite.scala | 7 ++-
.../apache/spark/ml/attribute/AttributeSuite.scala | 3 +-
.../apache/spark/ml/feature/InstanceSuite.scala | 3 +-
.../spark/ml/feature/LabeledPointSuite.scala | 3 +-
.../apache/spark/ml/tree/impl/TreePointSuite.scala | 3 +-
.../spark/mllib/clustering/KMeansSuite.scala | 3 +-
.../apache/spark/mllib/feature/Word2VecSuite.scala | 19 ++++--
.../apache/spark/mllib/linalg/MatricesSuite.scala | 3 +-
.../apache/spark/mllib/linalg/VectorsSuite.scala | 3 +-
.../spark/mllib/regression/LabeledPointSuite.scala | 3 +-
.../distribution/MultivariateGaussianSuite.scala | 3 +-
.../k8s/features/BasicDriverFeatureStep.scala | 11 ++--
.../k8s/features/DriverCommandFeatureStep.scala | 13 +++--
.../k8s/features/BasicDriverFeatureStepSuite.scala | 6 +-
.../integrationtest/KubernetesTestComponents.scala | 3 +-
.../deploy/mesos/MesosClusterDispatcher.scala | 1 +
.../org/apache/spark/deploy/mesos/config.scala | 12 ----
.../mesos/MesosClusterPersistenceEngine.scala | 11 ++--
.../cluster/mesos/MesosClusterScheduler.scala | 18 +++---
.../org/apache/spark/deploy/yarn/Client.scala | 6 +-
.../org/apache/spark/deploy/yarn/ClientSuite.scala | 3 +-
.../spark/deploy/yarn/YarnClusterSuite.scala | 2 +-
.../sql/DatasetSerializerRegistratorSuite.scala | 3 +-
...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala | 4 +-
.../sql/execution/joins/HashedRelationSuite.scala | 5 +-
69 files changed, 528 insertions(+), 280 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 22bcb81..b596be0 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -28,6 +28,7 @@ import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@@ -123,7 +124,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
- set("spark.jars", jars.filter(_ != null).mkString(","))
+ set(JARS, jars.filter(_ != null))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
@@ -201,12 +202,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
- allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').map(_.trim)
+ allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim)
.filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)
- set("spark.kryo.classesToRegister", allClassNames.mkString(","))
- set("spark.serializer", classOf[KryoSerializer].getName)
+ set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq)
+ set(SERIALIZER, classOf[KryoSerializer].getName)
this
}
@@ -547,20 +548,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
case "yarn-cluster" =>
logWarning(warning)
set("spark.master", "yarn")
- set("spark.submit.deployMode", "cluster")
+ set(SUBMIT_DEPLOY_MODE, "cluster")
case "yarn-client" =>
logWarning(warning)
set("spark.master", "yarn")
- set("spark.submit.deployMode", "client")
+ set(SUBMIT_DEPLOY_MODE, "client")
case _ => // Any other unexpected master will be checked when creating scheduler backend.
}
}
- if (contains("spark.submit.deployMode")) {
- get("spark.submit.deployMode") match {
+ if (contains(SUBMIT_DEPLOY_MODE)) {
+ get(SUBMIT_DEPLOY_MODE) match {
case "cluster" | "client" =>
- case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
- "\"client\".")
+ case e => throw new SparkException(s"${SUBMIT_DEPLOY_MODE.key} can only be " +
+ "\"cluster\" or \"client\".")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3bbf9f3..c9afc79 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -229,7 +229,7 @@ class SparkContext(config: SparkConf) extends Logging {
def jars: Seq[String] = _jars
def files: Seq[String] = _files
def master: String = _conf.get("spark.master")
- def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
+ def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
def appName: String = _conf.get("spark.app.name")
private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
@@ -2640,7 +2640,7 @@ object SparkContext extends Logging {
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case "yarn" =>
- if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
+ if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
conf.getInt(DRIVER_CORES.key, 0)
} else {
0
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ba5ed8a..4d7542c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -274,14 +274,13 @@ object SparkEnv extends Logging {
}
}
- // Create an instance of the class named by the given SparkConf property, or defaultClassName
+ // Create an instance of the class named by the given SparkConf property
// if the property is not set, possibly initializing it with our conf
- def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
- instantiateClass[T](conf.get(propertyName, defaultClassName))
+ def instantiateClassFromConf[T](propertyName: ConfigEntry[String]): T = {
+ instantiateClass[T](conf.get(propertyName))
}
- val serializer = instantiateClassFromConf[Serializer](
- "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 9bf35af..6832223 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -23,6 +23,7 @@ import java.util.Arrays
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.internal.config._
private[spark] object RUtils {
// Local path where R binary packages built from R source code contained in the spark
@@ -63,7 +64,7 @@ private[spark] object RUtils {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
} else {
val sparkConf = SparkEnv.get.conf
- (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode", "client"))
+ (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
}
val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 0679bdf..a662430 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -60,7 +60,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private object FaultToleranceTest extends App with Logging {
private val conf = new SparkConf()
- private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
+ private val zkDir = conf.get(config.Deploy.ZOOKEEPER_DIRECTORY).getOrElse("/spark")
private val masters = ListBuffer[TestMasterInfo]()
private val workers = ListBuffer[TestWorkerInfo]()
@@ -87,8 +87,8 @@ private object FaultToleranceTest extends App with Logging {
terminateCluster()
// Clear ZK directories in between tests (for speed purposes)
- SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
- SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
+ SparkCuratorUtil.deleteRecursive(zk, zkDir + "/spark_leader")
+ SparkCuratorUtil.deleteRecursive(zk, zkDir + "/master_status")
}
test("sanity-basic") {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
index 8247110..8118c01 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala
@@ -25,6 +25,7 @@ import org.apache.zookeeper.KeeperException
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
private[spark] object SparkCuratorUtil extends Logging {
@@ -35,7 +36,7 @@ private[spark] object SparkCuratorUtil extends Logging {
def newClient(
conf: SparkConf,
- zkUrlConf: String = "spark.deploy.zookeeper.url"): CuratorFramework = {
+ zkUrlConf: String = ZOOKEEPER_URL.key): CuratorFramework = {
val ZK_URL = conf.get(zkUrlConf)
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 57a8bdf..b403cc4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -437,7 +437,7 @@ private[spark] class SparkSubmit extends Logging {
}
if (localPyFiles != null) {
- sparkConf.set("spark.submit.pyFiles", localPyFiles)
+ sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq)
}
// In YARN mode for an R app, add the SparkR package archive and the R package
@@ -614,11 +614,11 @@ private[spark] class SparkSubmit extends Logging {
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
- var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
+ var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
- sparkConf.set("spark.jars", jars.mkString(","))
+ sparkConf.set(JARS, jars)
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
@@ -681,7 +681,7 @@ private[spark] class SparkSubmit extends Logging {
// Second argument is main class
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
- sparkConf.set("spark.submit.pyFiles", args.pyFiles)
+ sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq)
}
} else if (args.isR) {
// Second argument is main class
@@ -748,18 +748,17 @@ private[spark] class SparkSubmit extends Logging {
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
- sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
- val resolvedPyFiles = Utils.resolveURIs(pyFiles)
- val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
- PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
- } else {
- // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
- // support dealing with remote python files, they could distribute and add python files
- // locally.
- resolvedPyFiles
- }
- sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
+ val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
+ val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
+ val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
+ PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
+ } else {
+ // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
+ // support dealing with remote python files, they could distribute and add python files
+ // locally.
+ resolvedPyFiles
}
+ sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)
(childArgs, childClasspath, sparkConf, childMainClass)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 32f6d1f..b26da8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -34,7 +34,9 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
@@ -56,12 +58,12 @@ private[deploy] class Master(
// For application IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
- private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
- private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
- private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
- private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
- private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
+ private val workerTimeoutMs = conf.get(WORKER_TIMEOUT) * 1000
+ private val retainedApplications = conf.get(RETAINED_APPLICATIONS)
+ private val retainedDrivers = conf.get(RETAINED_DRIVERS)
+ private val reaperIterations = conf.get(REAPER_ITERATIONS)
+ private val recoveryMode = conf.get(RECOVERY_MODE)
+ private val maxExecutorRetries = conf.get(MAX_EXECUTOR_RETRIES)
val workers = new HashSet[WorkerInfo]
val idToApp = new HashMap[String, ApplicationInfo]
@@ -113,13 +115,13 @@ private[deploy] class Master(
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
+ private val spreadOutApps = conf.get(SPREAD_OUT_APPS)
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
- private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
+ private val defaultCores = conf.get(DEFAULT_CORES)
val reverseProxy = conf.get(UI_REVERSE_PROXY)
if (defaultCores < 1) {
- throw new SparkException("spark.deploy.defaultCores must be positive")
+ throw new SparkException(s"${DEFAULT_CORES.key} must be positive")
}
// Alternative application submission gateway that is stable across Spark versions
@@ -151,7 +153,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
- }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ }, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
@@ -168,7 +170,7 @@ private[deploy] class Master(
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
val serializer = new JavaSerializer(conf)
- val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
+ val (persistenceEngine_, leaderElectionAgent_) = recoveryMode match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
@@ -179,7 +181,7 @@ private[deploy] class Master(
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
- val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
+ val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
@@ -233,7 +235,7 @@ private[deploy] class Master(
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
- }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ }, workerTimeoutMs, TimeUnit.MILLISECONDS)
}
case CompleteRecovery => completeRecovery()
@@ -311,8 +313,8 @@ private[deploy] class Master(
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
if (!normalExit
- && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
- && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
+ && appInfo.incrementRetryCount() >= maxExecutorRetries
+ && maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
@@ -870,8 +872,8 @@ private[deploy] class Master(
endpointToApp -= app.driver
addressToApp -= app.driver.address
- if (completedApps.size >= RETAINED_APPLICATIONS) {
- val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+ if (completedApps.size >= retainedApplications) {
+ val toRemove = math.max(retainedApplications / 10, 1)
completedApps.take(toRemove).foreach { a =>
applicationMetricsSystem.removeSource(a.appSource)
}
@@ -989,14 +991,14 @@ private[deploy] class Master(
private def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
- val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
+ val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
- worker.id, WORKER_TIMEOUT_MS / 1000))
- removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
+ worker.id, workerTimeoutMs / 1000))
+ removeWorker(worker, s"Not receiving heartbeat for ${workerTimeoutMs / 1000} seconds")
} else {
- if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
+ if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
@@ -1031,8 +1033,8 @@ private[deploy] class Master(
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
- if (completedDrivers.size >= RETAINED_DRIVERS) {
- val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
+ if (completedDrivers.size >= retainedDrivers) {
+ val toRemove = math.max(retainedDrivers / 10, 1)
completedDrivers.trimStart(toRemove)
}
completedDrivers += driver
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index ffdd635..4707987 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY
import org.apache.spark.serializer.Serializer
/**
@@ -52,11 +53,11 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serial
private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
- val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
+ val recoveryDir = conf.get(RECOVERY_DIRECTORY)
def createPersistenceEngine(): PersistenceEngine = {
- logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
- new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
+ logInfo("Persisting recovery state to directory: " + recoveryDir)
+ new FileSystemPersistenceEngine(recoveryDir, serializer)
}
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 1e8dabf..47f3091 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -23,11 +23,12 @@ import org.apache.curator.framework.recipes.leader.{LeaderLatch, LeaderLatchList
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy.ZOOKEEPER_DIRECTORY
private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
- val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/leader_election"
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
@@ -38,7 +39,7 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderEle
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
- leaderLatch = new LeaderLatch(zk, WORKING_DIR)
+ leaderLatch = new LeaderLatch(zk, workingDir)
leaderLatch.addListener(this)
leaderLatch.start()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index af850e4..73dd0de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -28,6 +28,7 @@ import org.apache.zookeeper.CreateMode
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy._
import org.apache.spark.serializer.Serializer
@@ -35,22 +36,22 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
extends PersistenceEngine
with Logging {
- private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status"
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
- SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+ SparkCuratorUtil.mkdir(zk, workingDir)
override def persist(name: String, obj: Object): Unit = {
- serializeIntoFile(WORKING_DIR + "/" + name, obj)
+ serializeIntoFile(workingDir + "/" + name, obj)
}
override def unpersist(name: String): Unit = {
- zk.delete().forPath(WORKING_DIR + "/" + name)
+ zk.delete().forPath(workingDir + "/" + name)
}
override def read[T: ClassTag](prefix: String): Seq[T] = {
- zk.getChildren.forPath(WORKING_DIR).asScala
+ zk.getChildren.forPath(workingDir).asScala
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
}
@@ -66,13 +67,13 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
}
private def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
- val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
+ val fileData = zk.getData().forPath(workingDir + "/" + filename)
try {
Some(serializer.newInstance().deserialize[T](ByteBuffer.wrap(fileData)))
} catch {
case e: Exception =>
logWarning("Exception while reading persisted file, deleting", e)
- zk.delete().forPath(WORKING_DIR + "/" + filename)
+ zk.delete().forPath(workingDir + "/" + filename)
None
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index a6d13d1..8c2a907 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}
@@ -57,8 +58,7 @@ private[deploy] class DriverRunner(
@volatile private[worker] var finalException: Option[Exception] = None
// Timeout to wait for when trying to terminate a driver.
- private val DRIVER_TERMINATE_TIMEOUT_MS =
- conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s")
+ private val driverTerminateTimeoutMs = conf.get(WORKER_DRIVER_TERMINATE_TIMEOUT)
// Decoupled for testing
def setClock(_clock: Clock): Unit = {
@@ -122,7 +122,7 @@ private[deploy] class DriverRunner(
killed = true
synchronized {
process.foreach { p =>
- val exitCode = Utils.terminateProcess(p, DRIVER_TERMINATE_TIMEOUT_MS)
+ val exitCode = Utils.terminateProcess(p, driverTerminateTimeoutMs)
if (exitCode.isEmpty) {
logWarning("Failed to terminate driver process: " + p +
". This process will likely be orphaned.")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 8c3593c..115450b 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -39,6 +39,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
@@ -74,7 +75,7 @@ private[deploy] class Worker(
// For worker and executor IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
+ private val HEARTBEAT_MILLIS = conf.get(WORKER_TIMEOUT) * 1000 / 4
// Model retries to connect to the master, after Hadoop's model.
// The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
@@ -93,13 +94,11 @@ private[deploy] class Worker(
private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
* REGISTRATION_RETRY_FUZZ_MULTIPLIER))
- private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
+ private val CLEANUP_ENABLED = conf.get(WORKER_CLEANUP_ENABLED)
// How often worker will clean up old app folders
- private val CLEANUP_INTERVAL_MILLIS =
- conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
+ private val CLEANUP_INTERVAL_MILLIS = conf.get(WORKER_CLEANUP_INTERVAL) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
- private val APP_DATA_RETENTION_SECONDS =
- conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
+ private val APP_DATA_RETENTION_SECONDS = conf.get(APP_DATA_RETENTION)
// Whether or not cleanup the non-shuffle files on executor exits.
private val CLEANUP_NON_SHUFFLE_FILES_ENABLED =
@@ -111,8 +110,7 @@ private[deploy] class Worker(
* Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
* will just use the address received from Master.
*/
- private val preferConfiguredMasterAddress =
- conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
+ private val preferConfiguredMasterAddress = conf.get(PREFER_CONFIGURED_MASTER_ADDRESS)
/**
* The master address to connect in case of failure. When the connection is broken, worker will
* use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
@@ -143,10 +141,8 @@ private[deploy] class Worker(
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
- val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
- WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
- val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
- WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
+ val retainedExecutors = conf.get(WORKER_UI_RETAINED_EXECUTORS)
+ val retainedDrivers = conf.get(WORKER_UI_RETAINED_DRIVERS)
// The shuffle service is not actually started unless configured.
private val shuffleService = if (externalShuffleServiceSupplier != null) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 5802812..8c87708 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory
import scala.annotation.tailrec
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.Worker._
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
/**
@@ -59,9 +60,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
- if (conf.contains("spark.worker.ui.port")) {
- webUiPort = conf.get("spark.worker.ui.port").toInt
- }
+ conf.get(WORKER_UI_PORT).foreach { webUiPort = _ }
checkWorkerMemory()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 5488695..96980c3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -56,6 +56,4 @@ class WorkerWebUI(
private[worker] object WorkerWebUI {
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
- val DEFAULT_RETAINED_DRIVERS = 1000
- val DEFAULT_RETAINED_EXECUTORS = 1000
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
new file mode 100644
index 0000000..ceab957
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Deploy {
+ val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
+ .stringConf
+ .createWithDefault("NONE")
+
+ val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory")
+ .stringConf
+ .createWithDefault("")
+
+ val RECOVERY_DIRECTORY = ConfigBuilder("spark.deploy.recoveryDirectory")
+ .stringConf
+ .createWithDefault("")
+
+ val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url")
+ .doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " +
+ "configuration is used to set the zookeeper URL to connect to.")
+ .stringConf
+ .createOptional
+
+ val ZOOKEEPER_DIRECTORY = ConfigBuilder("spark.deploy.zookeeper.dir")
+ .stringConf
+ .createOptional
+
+ val RETAINED_APPLICATIONS = ConfigBuilder("spark.deploy.retainedApplications")
+ .intConf
+ .createWithDefault(200)
+
+ val RETAINED_DRIVERS = ConfigBuilder("spark.deploy.retainedDrivers")
+ .intConf
+ .createWithDefault(200)
+
+ val REAPER_ITERATIONS = ConfigBuilder("spark.dead.worker.persistence")
+ .intConf
+ .createWithDefault(15)
+
+ val MAX_EXECUTOR_RETRIES = ConfigBuilder("spark.deploy.maxExecutorRetries")
+ .intConf
+ .createWithDefault(10)
+
+ val SPREAD_OUT_APPS = ConfigBuilder("spark.deploy.spreadOut")
+ .booleanConf
+ .createWithDefault(true)
+
+ val DEFAULT_CORES = ConfigBuilder("spark.deploy.defaultCores")
+ .intConf
+ .createWithDefault(Int.MaxValue)
+
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
new file mode 100644
index 0000000..7873141
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Kryo {
+
+ val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")
+ .booleanConf
+ .createWithDefault(false)
+
+ val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
+ .stringConf
+ .createOptional
+
+ val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ val KRYO_USE_UNSAFE = ConfigBuilder("spark.kyro.unsafe")
+ .booleanConf
+ .createWithDefault(false)
+
+ val KRYO_USE_POOL = ConfigBuilder("spark.kyro.pool")
+ .booleanConf
+ .createWithDefault(true)
+
+ val KRYO_REFERENCE_TRACKING = ConfigBuilder("spark.kryo.referenceTracking")
+ .booleanConf
+ .createWithDefault(true)
+
+ val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")
+ .bytesConf(ByteUnit.KiB)
+ .createWithDefaultString("64k")
+
+ val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")
+ .bytesConf(ByteUnit.MiB)
+ .createWithDefaultString("64m")
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
new file mode 100644
index 0000000..47f7167
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+private[spark] object Worker {
+ val WORKER_TIMEOUT = ConfigBuilder("spark.worker.timeout")
+ .longConf
+ .createWithDefault(60)
+
+ val WORKER_DRIVER_TERMINATE_TIMEOUT = ConfigBuilder("spark.worker.driverTerminateTimeout")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("10s")
+
+ val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
+ val WORKER_CLEANUP_INTERVAL = ConfigBuilder("spark.worker.cleanup.interval")
+ .longConf
+ .createWithDefault(60 * 30)
+
+ val APP_DATA_RETENTION = ConfigBuilder("spark.worker.cleanup.appDataTtl")
+ .longConf
+ .createWithDefault(7 * 24 * 3600)
+
+ val PREFER_CONFIGURED_MASTER_ADDRESS = ConfigBuilder("spark.worker.preferConfiguredMasterAddress")
+ .booleanConf
+ .createWithDefault(false)
+
+ val WORKER_UI_PORT = ConfigBuilder("spark.worker.ui.port")
+ .intConf
+ .createOptional
+
+ val WORKER_UI_RETAINED_EXECUTORS = ConfigBuilder("spark.worker.ui.retainedExecutors")
+ .intConf
+ .createWithDefault(1000)
+
+ val WORKER_UI_RETAINED_DRIVERS = ConfigBuilder("spark.worker.ui.retainedDrivers")
+ .intConf
+ .createWithDefault(1000)
+
+ val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
+ ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
+ .intConf
+ .createWithDefault(100)
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0e78637..99ce220 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -961,4 +961,35 @@ package object config {
.intConf
.createWithDefault(4)
+ private[spark] val SERIALIZER = ConfigBuilder("spark.serializer")
+ .stringConf
+ .createWithDefault("org.apache.spark.serializer.JavaSerializer")
+
+ private[spark] val SERIALIZER_OBJECT_STREAM_RESET =
+ ConfigBuilder("spark.serializer.objectStreamReset")
+ .intConf
+ .createWithDefault(100)
+
+ private[spark] val SERIALIZER_EXTRA_DEBUG_INFO = ConfigBuilder("spark.serializer.extraDebugInfo")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[spark] val JARS = ConfigBuilder("spark.jars")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val FILES = ConfigBuilder("spark.files")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
+ private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode")
+ .stringConf
+ .createWithDefault("client")
+
+ private[spark] val SUBMIT_PYTHON_FILES = ConfigBuilder("spark.submit.pyFiles")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index f60dcfd..70564ee 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.config._
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
private[spark] class JavaSerializationStream(
@@ -137,8 +138,8 @@ private[spark] class JavaSerializerInstance(
*/
@DeveloperApi
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
- private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100)
- private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true)
+ private var counterReset = conf.get(SERIALIZER_OBJECT_STREAM_RESET)
+ private var extraDebugInfo = conf.get(SERIALIZER_EXTRA_DEBUG_INFO)
protected def this() = this(new SparkConf()) // For deserialization only
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 72ca0fb..2df133d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -39,6 +39,7 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
@@ -58,34 +59,34 @@ class KryoSerializer(conf: SparkConf)
with Logging
with Serializable {
- private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
+ private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
- throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
+ throw new IllegalArgumentException(s"${KRYO_SERIALIZER_BUFFER_SIZE.key} must be less than " +
s"2048 MiB, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} MiB.")
}
private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt
- val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
+ val maxBufferSizeMb = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE).toInt
if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
- throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
- s"2048 MiB, got: + $maxBufferSizeMb MiB.")
+ throw new IllegalArgumentException(s"${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} must be less " +
+ s"than 2048 MiB, got: $maxBufferSizeMb MiB.")
}
private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt
- private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
- private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
- private val userRegistrators = conf.get("spark.kryo.registrator", "")
- .split(',').map(_.trim)
+ private val referenceTracking = conf.get(KRYO_REFERENCE_TRACKING)
+ private val registrationRequired = conf.get(KRYO_REGISTRATION_REQUIRED)
+ private val userRegistrators = conf.get(KRYO_USER_REGISTRATORS)
+ .map(_.trim)
.filter(!_.isEmpty)
- private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
- .split(',').map(_.trim)
+ private val classesToRegister = conf.get(KRYO_CLASSES_TO_REGISTER)
+ .map(_.trim)
.filter(!_.isEmpty)
private val avroSchemas = conf.getAvroSchema
// whether to use unsafe based IO for serialization
- private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
- private val usePool = conf.getBoolean("spark.kryo.pool", true)
+ private val useUnsafe = conf.get(KRYO_USE_UNSAFE)
+ private val usePool = conf.get(KRYO_USE_POOL)
def newKryoOutput(): KryoOutput =
if (useUnsafe) {
@@ -407,7 +408,7 @@ private[spark] class KryoSerializerInstance(
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
- "increase spark.kryoserializer.buffer.max value.", e)
+ s"increase ${KRYO_SERIALIZER_MAX_BUFFER_SIZE.key} value.", e)
} finally {
releaseKryo(kryo)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 83d1b2b..7416559 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -62,6 +62,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
@@ -1457,16 +1458,12 @@ private[spark] object Utils extends Logging {
CallSite(shortForm, longForm)
}
- private val UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF =
- "spark.worker.ui.compressedLogFileLengthCacheSize"
- private val DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE = 100
private var compressedLogFileLengthCache: LoadingCache[String, java.lang.Long] = null
private def getCompressedLogFileLengthCache(
sparkConf: SparkConf): LoadingCache[String, java.lang.Long] = this.synchronized {
if (compressedLogFileLengthCache == null) {
- val compressedLogFileLengthCacheSize = sparkConf.getInt(
- UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF,
- DEFAULT_UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE)
+ val compressedLogFileLengthCacheSize = sparkConf.get(
+ UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
compressedLogFileLengthCache = CacheBuilder.newBuilder()
.maximumSize(compressedLogFileLengthCacheSize)
.build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
@@ -2535,8 +2532,7 @@ private[spark] object Utils extends Logging {
* has its own mechanism to distribute jars.
*/
def getUserJars(conf: SparkConf): Seq[String] = {
- val sparkJars = conf.getOption("spark.jars")
- sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
+ conf.get(JARS).filter(_.nonEmpty)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 61da413..f8adaf5 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
+import org.apache.spark.internal.config.Deploy._
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.ThreadUtils
@@ -256,7 +257,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
.set("spark.task.reaper.enabled", "true")
.set("spark.task.reaper.killTimeout", "-1")
.set("spark.task.reaper.PollingInterval", "1s")
- .set("spark.deploy.maxExecutorRetries", "1")
+ .set(MAX_EXECUTOR_RETRIES, 1)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
// Add a listener to release the semaphore once any tasks are launched.
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index ffa7042..3203f8f 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Callable, CyclicBarrier, Executors, ExecutorService
import org.scalatest.Matchers
import org.apache.spark.ShuffleSuite.NonJavaSerializableClass
+import org.apache.spark.internal.config.SERIALIZER
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD, SubtractedRDD}
@@ -215,7 +216,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
test("sort with Java non serializable class - Kryo") {
// Use a local cluster with 2 processes to make sure there are both local and remote blocks
- val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ val myConf = conf.clone().set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index a8849ab..4071dd4 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer}
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@@ -78,7 +79,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(conf.get("spark.master") === "local[3]")
assert(conf.get("spark.app.name") === "My app")
assert(conf.get("spark.home") === "/path")
- assert(conf.get("spark.jars") === "a.jar,b.jar")
+ assert(conf.get(JARS) === Seq("a.jar", "b.jar"))
assert(conf.get("spark.executorEnv.VAR1") === "value1")
assert(conf.get("spark.executorEnv.VAR2") === "value2")
assert(conf.get("spark.executorEnv.VAR3") === "value3")
@@ -86,7 +87,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
// Test the Java-friendly versions of these too
conf.setJars(Array("c.jar", "d.jar"))
conf.setExecutorEnv(Array(("VAR4", "value4"), ("VAR5", "value5")))
- assert(conf.get("spark.jars") === "c.jar,d.jar")
+ assert(conf.get(JARS) === Seq("c.jar", "d.jar"))
assert(conf.get("spark.executorEnv.VAR4") === "value4")
assert(conf.get("spark.executorEnv.VAR5") === "value5")
}
@@ -182,19 +183,19 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
}
test("register kryo classes through registerKryoClasses") {
- val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+ val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
- assert(conf.get("spark.kryo.classesToRegister") ===
- classOf[Class1].getName + "," + classOf[Class2].getName)
+ assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
+ Seq(classOf[Class1].getName, classOf[Class2].getName).toSet)
conf.registerKryoClasses(Array(classOf[Class3]))
- assert(conf.get("spark.kryo.classesToRegister") ===
- classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
+ assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
+ Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet)
conf.registerKryoClasses(Array(classOf[Class2]))
- assert(conf.get("spark.kryo.classesToRegister") ===
- classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
+ assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet ===
+ Seq(classOf[Class1].getName, classOf[Class2].getName, classOf[Class3].getName).toSet)
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
@@ -205,12 +206,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
}
test("register kryo classes through registerKryoClasses and custom registrator") {
- val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+ val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
conf.registerKryoClasses(Array(classOf[Class1]))
- assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)
+ assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)
- conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)
+ conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
@@ -220,9 +221,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
}
test("register kryo classes through conf") {
- val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
- conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
- conf.set("spark.serializer", classOf[KryoSerializer].getName)
+ val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
+ conf.set(KRYO_CLASSES_TO_REGISTER, Seq("java.lang.StringBuffer"))
+ conf.set(SERIALIZER, classOf[KryoSerializer].getName)
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
index 7407a65..24004de 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
@@ -24,6 +24,7 @@ import scala.io.Source
import org.scalatest.Matchers
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils
@@ -48,7 +49,7 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
}
val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)
assertBroadcastIsValid(broadcast)
- val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+ val conf = new SparkConf().set(KRYO_REGISTRATION_REQUIRED, true)
val deserializedBroadcast =
Utils.clone[PythonBroadcast](broadcast, new KryoSerializer(conf).newInstance())
assertBroadcastIsValid(deserializedBroadcast)
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 6d74812..18ec60d 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.Assertions
import org.apache.spark._
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.SERIALIZER
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.security.EncryptionFunSuite
@@ -68,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { conf =>
val numSlaves = 4
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(config.BROADCAST_COMPRESS, true)
sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
val list = List[Int](1, 2, 3, 4)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index c6e961e..30efbb0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -221,7 +221,7 @@ class SparkSubmitSuite
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
appArgs.deployMode should be ("client")
- conf.get("spark.submit.deployMode") should be ("client")
+ conf.get(SUBMIT_DEPLOY_MODE) should be ("client")
// Both cmd line and configuration are specified, cmdline option takes the priority
val clArgs1 = Seq(
@@ -235,7 +235,7 @@ class SparkSubmitSuite
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1)
appArgs1.deployMode should be ("cluster")
- conf1.get("spark.submit.deployMode") should be ("cluster")
+ conf1.get(SUBMIT_DEPLOY_MODE) should be ("cluster")
// Neither cmdline nor configuration are specified, client mode is the default choice
val clArgs2 = Seq(
@@ -248,7 +248,7 @@ class SparkSubmitSuite
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
appArgs2.deployMode should be ("client")
- conf2.get("spark.submit.deployMode") should be ("client")
+ conf2.get(SUBMIT_DEPLOY_MODE) should be ("client")
}
test("handles YARN cluster mode") {
@@ -374,12 +374,12 @@ class SparkSubmitSuite
val confMap = conf.getAll.toMap
confMap.keys should contain ("spark.master")
confMap.keys should contain ("spark.app.name")
- confMap.keys should contain ("spark.jars")
+ confMap.keys should contain (JARS.key)
confMap.keys should contain ("spark.driver.memory")
confMap.keys should contain ("spark.driver.cores")
confMap.keys should contain ("spark.driver.supervise")
confMap.keys should contain (UI_ENABLED.key)
- confMap.keys should contain ("spark.submit.deployMode")
+ confMap.keys should contain (SUBMIT_DEPLOY_MODE.key)
conf.get(UI_ENABLED) should be (false)
}
@@ -467,7 +467,7 @@ class SparkSubmitSuite
val (_, _, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.master") should be ("yarn")
- conf.get("spark.submit.deployMode") should be ("cluster")
+ conf.get(SUBMIT_DEPLOY_MODE) should be ("cluster")
mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS)
}
@@ -662,7 +662,7 @@ class SparkSubmitSuite
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
appArgs.jars should be(Utils.resolveURIs(jars))
appArgs.files should be(Utils.resolveURIs(files))
- conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
+ conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq)
conf.get("spark.files") should be(Utils.resolveURIs(files))
// Test files and archives (Yarn)
@@ -692,8 +692,8 @@ class SparkSubmitSuite
val appArgs3 = new SparkSubmitArguments(clArgs3)
val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
appArgs3.pyFiles should be(Utils.resolveURIs(pyFiles))
- conf3.get("spark.submit.pyFiles") should be(
- PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ conf3.get(SUBMIT_PYTHON_FILES) should be(
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)))
conf3.get(PYSPARK_DRIVER_PYTHON.key) should be("python3.4")
conf3.get(PYSPARK_PYTHON.key) should be("python3.5")
}
@@ -744,8 +744,8 @@ class SparkSubmitSuite
)
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
- conf.get("spark.jars") should be(Utils.resolveURIs(jars + ",thejar.jar"))
- conf.get("spark.files") should be(Utils.resolveURIs(files))
+ conf.get(JARS) should be(Utils.resolveURIs(jars + ",thejar.jar").split(",").toSeq)
+ conf.get(FILES) should be(Utils.resolveURIs(files).split(",").toSeq)
// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
@@ -776,8 +776,8 @@ class SparkSubmitSuite
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
val (_, _, conf3, _) = submit.prepareSubmitEnvironment(appArgs3)
- conf3.get("spark.submit.pyFiles") should be(
- PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ conf3.get(SUBMIT_PYTHON_FILES) should be(
+ PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)))
// Test remote python files
val hadoopConf = new Configuration()
@@ -798,7 +798,7 @@ class SparkSubmitSuite
val appArgs4 = new SparkSubmitArguments(clArgs4)
val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
// Should not format python path for yarn cluster mode
- conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
+ conf4.get(SUBMIT_PYTHON_FILES) should be(Utils.resolveURIs(remotePyFiles).split(","))
}
}
@@ -1024,7 +1024,7 @@ class SparkSubmitSuite
conf.get("spark.repl.local.jars") should (startWith("file:"))
// local py files should not be a URI format.
- conf.get("spark.submit.pyFiles") should (startWith("/"))
+ conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
}
}
@@ -1155,7 +1155,7 @@ class SparkSubmitSuite
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))
conf.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
- conf.get("spark.submit.pyFiles") should (startWith("/"))
+ conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
// Verify "spark.submit.pyFiles"
val args1 = Seq(
@@ -1171,7 +1171,7 @@ class SparkSubmitSuite
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))
conf1.get(PY_FILES.key) should be(s"s3a://${pyFile.getAbsolutePath}")
- conf1.get("spark.submit.pyFiles") should (startWith("/"))
+ conf.get(SUBMIT_PYTHON_FILES).foreach { _ should (startWith("/")) }
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 5904d03..fbf2acc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -40,7 +40,9 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer
@@ -103,9 +105,8 @@ class MasterSuite extends SparkFunSuite
test("can use a custom recovery mode factory") {
val conf = new SparkConf(loadDefaults = false)
- conf.set("spark.deploy.recoveryMode", "CUSTOM")
- conf.set("spark.deploy.recoveryMode.factory",
- classOf[CustomRecoveryModeFactory].getCanonicalName)
+ conf.set(RECOVERY_MODE, "CUSTOM")
+ conf.set(RECOVERY_MODE_FACTORY, classOf[CustomRecoveryModeFactory].getCanonicalName)
conf.set(MASTER_REST_SERVER_ENABLED, false)
val instantiationAttempts = CustomRecoveryModeFactory.instantiationAttempts
@@ -188,9 +189,8 @@ class MasterSuite extends SparkFunSuite
test("master correctly recover the application") {
val conf = new SparkConf(loadDefaults = false)
- conf.set("spark.deploy.recoveryMode", "CUSTOM")
- conf.set("spark.deploy.recoveryMode.factory",
- classOf[FakeRecoveryModeFactory].getCanonicalName)
+ conf.set(RECOVERY_MODE, "CUSTOM")
+ conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName)
conf.set(MASTER_REST_SERVER_ENABLED, false)
val fakeAppInfo = makeAppInfo(1024)
@@ -637,7 +637,7 @@ class MasterSuite extends SparkFunSuite
}
test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
- val conf = new SparkConf().set("spark.worker.timeout", "1")
+ val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
val master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 3027865..3d8a46b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomUtils
import org.apache.curator.test.TestingServer
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.util.Utils
@@ -48,7 +49,7 @@ class PersistenceEngineSuite extends SparkFunSuite {
val zkTestServer = new TestingServer(findFreePort(conf))
try {
testPersistenceEngine(conf, serializer => {
- conf.set("spark.deploy.zookeeper.url", zkTestServer.getConnectString)
+ conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString)
new ZooKeeperPersistenceEngine(conf, serializer)
})
} finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
index 75c50af..87655f3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -22,6 +22,7 @@ import java.lang.Boolean
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
/**
@@ -93,7 +94,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
message.sparkProperties = conf.getAll.toMap
message.validate()
// optional fields
- conf.set("spark.jars", "mayonnaise.jar,ketchup.jar")
+ conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar"))
conf.set("spark.files", "fireball.png")
conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
conf.set("spark.driver.cores", "180")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index e5e5b5e..0ddf38c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
@@ -100,7 +101,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedExecutors (small number of executors)") {
val conf = new SparkConf()
- conf.set("spark.worker.ui.retainedExecutors", 2.toString)
+ conf.set(WORKER_UI_RETAINED_EXECUTORS, 2)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
@@ -124,7 +125,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedExecutors (more executors)") {
val conf = new SparkConf()
- conf.set("spark.worker.ui.retainedExecutors", 30.toString)
+ conf.set(WORKER_UI_RETAINED_EXECUTORS, 30)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
@@ -157,7 +158,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedDrivers (small number of drivers)") {
val conf = new SparkConf()
- conf.set("spark.worker.ui.retainedDrivers", 2.toString)
+ conf.set(WORKER_UI_RETAINED_DRIVERS, 2)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
@@ -181,7 +182,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
test("test clearing of finishedDrivers (more drivers)") {
val conf = new SparkConf()
- conf.set("spark.worker.ui.retainedDrivers", 30.toString)
+ conf.set(WORKER_UI_RETAINED_DRIVERS, 30)
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index f41ffb7..c1e7fb9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -181,7 +181,7 @@ class MapStatusSuite extends SparkFunSuite {
test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") {
val conf = new SparkConf()
- .set("spark.serializer", classOf[KryoSerializer].getName)
+ .set(config.SERIALIZER, classOf[KryoSerializer].getName)
.setMaster("local")
.setAppName("SPARK-21133")
withSpark(new SparkContext(conf)) { sc =>
diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
index 3734f1c..8610b18 100644
--- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
@@ -25,9 +25,10 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.GenericData.Record
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
+import org.apache.spark.internal.config.SERIALIZER
class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
val schema : Schema = SchemaBuilder
.record("testRecord").fields()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
index d7730f2..fd228cd 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -22,6 +22,8 @@ import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoTest._
/**
@@ -122,9 +124,9 @@ object KryoBenchmark extends BenchmarkBase {
def createSerializer(useUnsafe: Boolean): SerializerInstance = {
val conf = new SparkConf()
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
- conf.set("spark.kryo.unsafe", useUnsafe.toString)
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+ conf.set(KRYO_USE_UNSAFE, useUnsafe)
new KryoSerializer(conf).newInstance()
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
index 2a15c6f..2915b99 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala
@@ -23,6 +23,8 @@ import scala.concurrent.duration._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.util.ThreadUtils
@@ -69,9 +71,9 @@ object KryoSerializerBenchmark extends BenchmarkBase {
def createSparkContext(usePool: Boolean): SparkContext = {
val conf = new SparkConf()
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
- conf.set("spark.kryo.pool", usePool.toString)
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+ conf.set(KRYO_USE_POOL, usePool)
if (sc != null) {
sc.stop()
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index 46aa9c3..ae87109 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -28,8 +28,8 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", classOf[AppJarRegistrator].getName)
+ .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
.set(config.MAX_TASK_FAILURES, 1)
.set(config.BLACKLIST_ENABLED, false)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
index cf01f79..25f0b19 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerResizableOutputSuite.scala
@@ -21,6 +21,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.LocalSparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkException
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
class KryoSerializerResizableOutputSuite extends SparkFunSuite {
@@ -29,9 +31,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite {
test("kryo without resizable output buffer should fail on large array") {
val conf = new SparkConf(false)
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryoserializer.buffer", "1m")
- conf.set("spark.kryoserializer.buffer.max", "1m")
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
+ conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "1m")
withSpark(new SparkContext("local", "test", conf)) { sc =>
intercept[SparkException](sc.parallelize(x).collect())
}
@@ -39,9 +41,9 @@ class KryoSerializerResizableOutputSuite extends SparkFunSuite {
test("kryo with resizable output buffer should succeed on large array") {
val conf = new SparkConf(false)
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryoserializer.buffer", "1m")
- conf.set("spark.kryoserializer.buffer.max", "2m")
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
+ conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "2m")
withSpark(new SparkContext("local", "test", conf)) { sc =>
assert(sc.parallelize(x).collect() === x)
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 8af5327..41fb405 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -32,19 +32,21 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ThreadUtils, Utils}
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
- conf.set("spark.kryo.unsafe", "false")
+ conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
+ conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
+ conf.set(KRYO_USE_UNSAFE, false)
test("SPARK-7392 configuration limits") {
- val kryoBufferProperty = "spark.kryoserializer.buffer"
- val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
+ val kryoBufferProperty = KRYO_SERIALIZER_BUFFER_SIZE.key
+ val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
def newKryoInstance(
conf: SparkConf,
@@ -81,7 +83,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("basic types") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
@@ -114,7 +116,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("pairs") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
@@ -141,7 +143,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("Scala data structures") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
@@ -169,7 +171,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
test("Bug: SPARK-10251") {
- val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true"))
+ val ser = new KryoSerializer(conf.clone.set(KRYO_REGISTRATION_REQUIRED, true))
.newInstance()
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
@@ -253,7 +255,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
hashMap.put("foo", "bar")
check(hashMap)
- System.clearProperty("spark.kryo.registrator")
+ System.clearProperty(KRYO_USER_REGISTRATORS.key)
}
test("kryo with collect") {
@@ -310,7 +312,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
import org.apache.spark.SparkException
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrator", "this.class.does.not.exist")
+ conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
@@ -337,7 +339,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("registration of HighlyCompressedMapStatus") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
@@ -355,7 +357,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("serialization buffer overflow reporting") {
import org.apache.spark.SparkException
- val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
+ val kryoBufferMaxProperty = KRYO_SERIALIZER_MAX_BUFFER_SIZE.key
val largeObject = (1 to 1000000).toArray
@@ -409,7 +411,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("getAutoReset") {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
- val conf = new SparkConf().set("spark.kryo.registrator",
+ val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
@@ -438,10 +440,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
private def testSerializerInstanceReuse(
autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): Unit = {
val conf = new SparkConf(loadDefaults = false)
- .set("spark.kryo.referenceTracking", referenceTracking.toString)
- .set("spark.kryo.pool", usePool.toString)
+ .set(KRYO_REFERENCE_TRACKING, referenceTracking)
+ .set(KRYO_USE_POOL, usePool)
if (!autoReset) {
- conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
+ conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
@@ -478,7 +480,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4))
- val ser = new KryoSerializer(conf.clone.set("spark.kryo.pool", "true"))
+ val ser = new KryoSerializer(conf.clone.set(KRYO_USE_POOL, true))
val tests = mutable.ListBuffer[Future[Boolean]]()
@@ -519,9 +521,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
- conf.set("spark.serializer", classOf[KryoSerializer].getName)
- conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName)
- conf.set("spark.kryo.referenceTracking", "true")
+ conf.set(SERIALIZER, classOf[KryoSerializer].getName)
+ conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
+ conf.set(KRYO_REFERENCE_TRACKING, true)
conf.set("spark.shuffle.manager", "sort")
conf.set("spark.shuffle.sort.bypassMergeThreshold", "200")
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
index 99882bf..dad080c 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
import org.scalatest.Assertions
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
/**
@@ -50,7 +51,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {
}
test("KryoSerializer does not support relocation when auto-reset is disabled") {
- val conf = new SparkConf().set("spark.kryo.registrator",
+ val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
val ser = new KryoSerializer(conf)
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
index d63a45a..126ba0e 100644
--- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
@@ -17,17 +17,19 @@
package org.apache.spark.serializer
+import org.apache.spark.internal.config.Kryo._
+
class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
// This test suite should run all tests in KryoSerializerSuite with kryo unsafe.
override def beforeAll() {
- conf.set("spark.kryo.unsafe", "true")
+ conf.set(KRYO_USE_UNSAFE, true)
super.beforeAll()
}
override def afterAll() {
- conf.set("spark.kryo.unsafe", "false")
+ conf.set(KRYO_USE_UNSAFE, false)
super.afterAll()
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
index 4282850..fc16fe3 100644
--- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
import org.apache.spark._
-
+import org.apache.spark.internal.config._
class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
/* Tests the ability of Spark to deal with user provided iterators from flatMap
@@ -55,7 +55,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
test("Serializer Reset") {
val sconf = new SparkConf().setMaster("local").setAppName("serializer_reset_test")
- .set("spark.serializer.objectStreamReset", "10")
+ .set(SERIALIZER_OBJECT_STREAM_RESET, 10)
sc = new SparkContext(sconf)
val expand_size = 500
val data = sc.parallelize(Seq(1, 2)).
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index d3f94fb..7aca0ad 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.SparkListener
@@ -829,7 +830,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
test("isDynamicAllocationEnabled") {
val conf = new SparkConf()
conf.set("spark.master", "yarn")
- conf.set("spark.submit.deployMode", "client")
+ conf.set(SUBMIT_DEPLOY_MODE, "client")
assert(Utils.isDynamicAllocationEnabled(conf) === false)
assert(Utils.isDynamicAllocationEnabled(
conf.set("spark.dynamicAllocation.enabled", "false")) === false)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 5efbf4e..de70153 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -54,8 +54,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
+ conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
conf.set("spark.shuffle.compress", codec.isDefined.toString)
codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) }
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 14148e0..3006409 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark._
+import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_MEMORY
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -268,12 +269,12 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
if (kryo) {
- conf.set("spark.serializer", classOf[KryoSerializer].getName)
+ conf.set(SERIALIZER, classOf[KryoSerializer].getName)
} else {
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", classOf[JavaSerializer].getName)
+ conf.set(SERIALIZER_OBJECT_STREAM_RESET, 1)
+ conf.set(SERIALIZER, classOf[JavaSerializer].getName)
}
conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
// Ensure that we actually have multiple batches per spill file
diff --git a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala
index eb5f3ca..7f892fd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/attribute/AttributeSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.ml.attribute
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.types._
@@ -225,7 +226,7 @@ class AttributeSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
index cca7399..5a74490 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
@@ -18,13 +18,14 @@
package org.apache.spark.ml.feature
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.serializer.KryoSerializer
class InstanceSuite extends SparkFunSuite{
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala
index 05c7a58..63c1635 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LabeledPointSuite.scala
@@ -18,13 +18,14 @@
package org.apache.spark.ml.feature
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.serializer.KryoSerializer
class LabeledPointSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala
index f41abe4..3a44e79 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreePointSuite.scala
@@ -18,12 +18,13 @@
package org.apache.spark.ml.tree.impl
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoSerializer
class TreePointSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
index d18cef7..c4bf5b2 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
import org.apache.spark.mllib.util.TestingUtils._
@@ -316,7 +317,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index f4fa216..a679fe4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -18,8 +18,10 @@
package org.apache.spark.mllib.feature
import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.util.Utils
class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
@@ -109,12 +111,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
test("big model load / save") {
// backupping old values
- val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m")
- val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k")
+ val oldBufferConfValue = spark.conf.get(KRYO_SERIALIZER_BUFFER_SIZE.key, "64m")
+ val oldBufferMaxConfValue = spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64k")
+ val oldSetCommandRejectsSparkCoreConfs = spark.conf.get(
+ SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "true")
// setting test values to trigger partitioning
- spark.conf.set("spark.kryoserializer.buffer", "50b")
- spark.conf.set("spark.kryoserializer.buffer.max", "50b")
+
+ // this is needed to set configurations which are also defined to SparkConf
+ spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, "false")
+ spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, "50b")
// create a model bigger than 50 Bytes
val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*)
@@ -137,8 +143,9 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
"that spans over multiple partitions", t)
} finally {
Utils.deleteRecursively(tempDir)
- spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue)
- spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue)
+ spark.conf.set(KRYO_SERIALIZER_BUFFER_SIZE.key, oldBufferConfValue)
+ spark.conf.set(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, oldBufferMaxConfValue)
+ spark.conf.set(SET_COMMAND_REJECTS_SPARK_CORE_CONFS.key, oldSetCommandRejectsSparkCoreConfs)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
index 2c3f846..b4520d4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -27,6 +27,7 @@ import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.serializer.KryoSerializer
@@ -34,7 +35,7 @@ import org.apache.spark.serializer.KryoSerializer
class MatricesSuite extends SparkFunSuite {
test("kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 217b4a3..fee0b02 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -25,6 +25,7 @@ import org.json4s.jackson.JsonMethods.{parse => parseJson}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.serializer.KryoSerializer
@@ -38,7 +39,7 @@ class VectorsSuite extends SparkFunSuite with Logging {
test("kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
index c1449ec..d3366dc 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LabeledPointSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.regression
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.serializer.KryoSerializer
@@ -57,7 +58,7 @@ class LabeledPointSuite extends SparkFunSuite {
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala
index 5b4a260..4c88fd3 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/distribution/MultivariateGaussianSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.stat.distribution
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@@ -83,7 +84,7 @@ class MultivariateGaussianSuite extends SparkFunSuite with MLlibTestSparkContext
test("Kryo class register") {
val conf = new SparkConf(false)
- conf.set("spark.kryo.registrationRequired", "true")
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
val ser = new KryoSerializer(conf).newInstance()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 39834fc..e664b64 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -154,12 +154,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
- Seq("spark.jars", "spark.files").foreach { key =>
- conf.getOption(key).foreach { value =>
- val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value))
- if (resolved.nonEmpty) {
- additionalProps.put(key, resolved.mkString(","))
- }
+ Seq(JARS, FILES).foreach { key =>
+ val value = conf.get(key)
+ val resolved = KubernetesUtils.resolveFileUrisAndPath(value)
+ if (resolved.nonEmpty) {
+ additionalProps.put(key.key, resolved.mkString(","))
}
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index 76b4ec9..bd3f8a1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -109,21 +109,22 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}
private def additionalJavaProperties(resource: String): Map[String, String] = {
- resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource))
+ resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource))
}
private def additionalPythonProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_PYTHON) ++
- mergeFileList("spark.files", Seq(resource) ++ conf.pyFiles)
+ mergeFileList(FILES, Seq(resource) ++ conf.pyFiles)
}
private def additionalRProperties(resource: String): Map[String, String] = {
- resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource))
+ resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource))
}
- private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = {
- val existing = Utils.stringToSeq(conf.get(key, ""))
- Map(key -> (existing ++ filesToAdd).distinct.mkString(","))
+ private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: Seq[String])
+ : Map[String, String] = {
+ val existing = conf.get(key)
+ Map(key.key -> (existing ++ filesToAdd).distinct.mkString(","))
}
private def resourceType(resType: String): Map[String, String] = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 90255a5..ccf88cc 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -143,7 +143,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.setJars(allJars)
- .set("spark.files", allFiles.mkString(","))
+ .set(FILES, allFiles)
.set(CONTAINER_IMAGE, "spark-driver:latest")
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
@@ -154,8 +154,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
"spark.app.id" -> KubernetesTestConf.APP_ID,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix,
"spark.kubernetes.submitInDriver" -> "true",
- "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
- "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
+ JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
+ FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
assert(additionalProperties === expectedSparkConf)
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index c869803..e539c8e 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.JARS
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI.UI_ENABLED
@@ -86,7 +87,7 @@ private[spark] class SparkAppConf {
def get(key: String): String = map.getOrElse(key, "")
- def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(","))
+ def setJars(jars: Seq[String]): Unit = set(JARS.key, jars.mkString(","))
override def toString: String = map.toString
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 32ac4f3..bc1247a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy._
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, SparkUncaughtExceptionHandler, Utils}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index dd0b2ba..2b8655c 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -63,11 +63,6 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("30s")
- private[spark] val RECOVERY_MODE =
- ConfigBuilder("spark.deploy.recoveryMode")
- .stringConf
- .createWithDefault("NONE")
-
private[spark] val DISPATCHER_WEBUI_URL =
ConfigBuilder("spark.mesos.dispatcher.webui.url")
.doc("Set the Spark Mesos dispatcher webui_url for interacting with the " +
@@ -75,13 +70,6 @@ package object config {
.stringConf
.createOptional
- private[spark] val ZOOKEEPER_URL =
- ConfigBuilder("spark.deploy.zookeeper.url")
- .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " +
- "configuration is used to set the zookeeper URL to connect to.")
- .stringConf
- .createOptional
-
private[spark] val HISTORY_SERVER_URL =
ConfigBuilder("spark.mesos.dispatcher.historyServer.url")
.doc("Set the URL of the history server. The dispatcher will then " +
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
index 61ab3e8..123412f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala
@@ -26,6 +26,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkCuratorUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Deploy._
import org.apache.spark.util.Utils
/**
@@ -94,13 +95,13 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
zk: CuratorFramework,
conf: SparkConf)
extends MesosClusterPersistenceEngine with Logging {
- private val WORKING_DIR =
- conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir
+ private val workingDir =
+ conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark_mesos_dispatcher") + "/" + baseDir
- SparkCuratorUtil.mkdir(zk, WORKING_DIR)
+ SparkCuratorUtil.mkdir(zk, workingDir)
def path(name: String): String = {
- WORKING_DIR + "/" + name
+ workingDir + "/" + name
}
override def expunge(name: String): Unit = {
@@ -129,6 +130,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine(
}
override def fetchAll[T](): Iterable[T] = {
- zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T])
+ zk.getChildren.forPath(workingDir).asScala.flatMap(fetch[T])
}
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 021b1ac..8c961a5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -32,7 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
-import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, EXECUTOR_MEMORY}
+import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.Utils
@@ -432,7 +432,7 @@ private[spark] class MesosClusterScheduler(
private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
val confUris = List(conf.getOption("spark.mesos.uris"),
desc.conf.getOption("spark.mesos.uris"),
- desc.conf.getOption("spark.submit.pyFiles")).flatMap(
+ Some(desc.conf.get(SUBMIT_PYTHON_FILES).mkString(","))).flatMap(
_.map(_.split(",").map(_.trim))
).flatten
@@ -534,16 +534,16 @@ private[spark] class MesosClusterScheduler(
desc.conf.getOption(CORES_MAX.key).foreach { v =>
options ++= Seq("--total-executor-cores", v)
}
- desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
- val formattedFiles = pyFiles.split(",")
- .map { path => new File(sandboxPath, path.split("/").last).toString() }
- .mkString(",")
- options ++= Seq("--py-files", formattedFiles)
- }
+
+ val pyFiles = desc.conf.get(SUBMIT_PYTHON_FILES)
+ val formattedFiles = pyFiles.map { path =>
+ new File(sandboxPath, path.split("/").last).toString()
+ }.mkString(",")
+ options ++= Seq("--py-files", formattedFiles)
// --conf
val replicatedOptionsBlacklist = Set(
- "spark.jars", // Avoids duplicate classes in classpath
+ JARS.key, // Avoids duplicate classes in classpath
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8492180..7992292 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -68,7 +68,7 @@ private[spark] class Client(
private val yarnClient = YarnClient.createYarnClient
private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
- private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
+ private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
// AM related configurations
private val amMemory = if (isClusterMode) {
@@ -1532,8 +1532,8 @@ private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
- conf.remove("spark.jars")
- conf.remove("spark.files")
+ conf.remove(JARS)
+ conf.remove(FILES)
new Client(new ClientArguments(args), conf).run()
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 9acd995..25827fd 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -40,6 +40,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.config._
import org.apache.spark.util.{SparkConfWithEnv, Utils}
class ClientSuite extends SparkFunSuite with Matchers {
@@ -368,7 +369,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val resources = Map("fpga" -> 2, "gpu" -> 3)
ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
- val conf = new SparkConf().set("spark.submit.deployMode", deployMode)
+ val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
resources.foreach { case (name, v) =>
conf.set(prefix + name, v.toString)
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index b7e83c8..faddb8f 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -443,7 +443,7 @@ private object YarnClusterDriver extends Logging with Matchers {
// If we are running in yarn-cluster mode, verify that driver logs links and present and are
// in the expected format.
- if (conf.get("spark.submit.deployMode") == "cluster") {
+ if (conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
index 68f7de0..69728ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala
@@ -21,6 +21,7 @@ import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.sql.test.SharedSQLContext
@@ -33,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSQLContext
override protected def sparkConf: SparkConf = {
// Make sure we use the KryoRegistrator
- super.sparkConf.set("spark.kryo.registrator", TestRegistrator().getClass.getCanonicalName)
+ super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
}
test("Kryo registrator") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index e174dc6..0869e25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -45,8 +45,8 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
private val conf = new SparkConf(false)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
- .set("spark.serializer.objectStreamReset", "1")
- .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ .set(config.SERIALIZER_OBJECT_STREAM_RESET, 1)
+ .set(config.SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
private def withFakeTaskContext(f: => Unit): Unit = {
val sc = new SparkContext("local", "test", conf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 7b55e83..1c89910 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -22,7 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream,
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo._
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
@@ -309,7 +310,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
test("Spark-14521") {
val ser = new KryoSerializer(
- (new SparkConf).set("spark.kryo.referenceTracking", "false")).newInstance()
+ (new SparkConf).set(KRYO_REFERENCE_TRACKING, false)).newInstance()
val key = Seq(BoundReference(0, LongType, false))
// Testing Kryo serialization of HashedRelation
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org