You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/22 01:45:30 UTC

[spark] branch master updated: [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e7443d6  [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs
e7443d6 is described below

commit e7443d6412582aa16769e298c31d889a5ba0143c
Author: wenxuanguan <ch...@126.com>
AuthorDate: Wed May 22 10:45:11 2019 +0900

    [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs
    
    ## What changes were proposed in this pull request?
    
    avoid hardcoded configs in `SparkConf` and `SparkSubmit` and test
    
    ## How was this patch tested?
    
    N/A
    
    Closes #24631 from wenxuanguan/minor-fix.
    
    Authored-by: wenxuanguan <ch...@126.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |  8 ++++----
 .../main/scala/org/apache/spark/api/r/RUtils.scala |  2 +-
 .../scala/org/apache/spark/deploy/RRunner.scala    |  3 ++-
 .../org/apache/spark/deploy/SparkSubmit.scala      |  6 +++---
 .../apache/spark/deploy/worker/DriverWrapper.scala |  2 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |  2 +-
 .../scala/org/apache/spark/SparkContextSuite.scala |  2 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala |  4 ++--
 .../deploy/StandaloneDynamicAllocationSuite.scala  |  6 +++---
 .../deploy/rest/SubmitRestProtocolSuite.scala      | 24 +++++++++++-----------
 .../storage/BlockManagerReplicationSuite.scala     |  6 +++---
 .../apache/spark/storage/BlockManagerSuite.scala   |  5 +++--
 .../apache/spark/storage/MemoryStoreSuite.scala    |  5 +++--
 .../org/apache/spark/ml/feature/Word2Vec.scala     |  3 ++-
 .../org/apache/spark/mllib/feature/Word2Vec.scala  |  3 ++-
 .../cluster/mesos/MesosClusterScheduler.scala      |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala      |  2 +-
 17 files changed, 45 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index aa93f42..bd2ef5b 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -595,7 +595,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
     // it will almost always cause ExecutorLostFailure. See SPARK-22754.
     require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
       s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than the value of " +
-      s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
+      s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.")
   }
 
   /**
@@ -667,12 +667,12 @@ private[spark] object SparkConf extends Logging {
         translation = s => s"${s.toLong * 10}s")),
     REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq(
       AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
-    "spark.kryoserializer.buffer" -> Seq(
+    KRYO_SERIALIZER_BUFFER_SIZE.key -> Seq(
       AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
         translation = s => s"${(s.toDouble * 1000).toInt}k")),
-    "spark.kryoserializer.buffer.max" -> Seq(
+    KRYO_SERIALIZER_MAX_BUFFER_SIZE.key -> Seq(
       AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
-    "spark.shuffle.file.buffer" -> Seq(
+    SHUFFLE_FILE_BUFFER_SIZE.key -> Seq(
       AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
     EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
       AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 5a43302..311fade 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -60,7 +60,7 @@ private[spark] object RUtils {
   def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
     val (master, deployMode) =
       if (isDriver) {
-        (sys.props("spark.master"), sys.props("spark.submit.deployMode"))
+        (sys.props("spark.master"), sys.props(SUBMIT_DEPLOY_MODE.key))
       } else {
         val sparkConf = SparkEnv.get.conf
         (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index 6284e6a..60ba047 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.{SparkException, SparkUserAppException}
 import org.apache.spark.api.r.{RBackend, RUtils}
 import org.apache.spark.internal.config.R._
+import org.apache.spark.internal.config.SUBMIT_DEPLOY_MODE
 import org.apache.spark.util.RedirectThread
 
 /**
@@ -46,7 +47,7 @@ object RRunner {
       // but kept here for backward compatibility.
       var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get)
       cmd = sys.props.getOrElse(R_COMMAND.key, cmd)
-      if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
+      if (sys.props.getOrElse(SUBMIT_DEPLOY_MODE.key, "client") == "client") {
         cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
       }
       cmd
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 49d9395..59b638b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -517,7 +517,7 @@ private[spark] class SparkSubmit extends Logging {
       // All cluster managers
       OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
       OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
-        confKey = "spark.submit.deployMode"),
+        confKey = SUBMIT_DEPLOY_MODE.key),
       OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
       OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
       OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
@@ -747,8 +747,8 @@ private[spark] class SparkSubmit extends Logging {
 
     // Resolve paths in certain spark properties
     val pathConfigs = Seq(
-      "spark.jars",
-      "spark.files",
+      JARS.key,
+      FILES.key,
       "spark.yarn.dist.files",
       "spark.yarn.dist.archives",
       "spark.yarn.dist.jars")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 1e8ad0b..9b51bea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -91,7 +91,7 @@ object DriverWrapper extends Logging {
     val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
       packages, repositories, ivyRepoPath, Option(ivySettingsPath))
     val jars = {
-      val jarsProp = sys.props.get("spark.jars").orNull
+      val jarsProp = sys.props.get(config.JARS.key).orNull
       if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
         DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
       } else {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 3a5de8a..83a9ea3 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -248,7 +248,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
 
     conf.set("spark.kryoserializer.buffer.mb", "1.1")
-    assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
+    assert(conf.getSizeAsKb(KRYO_SERIALIZER_BUFFER_SIZE.key) === 1100)
 
     conf.set("spark.history.fs.cleaner.maxAge.seconds", "42")
     assert(conf.get(MAX_LOG_AGE_S) === 42L)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 3490eaf..1dcb2f7 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -671,7 +671,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       // task metrics via heartbeat to driver.
       .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
       // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast
-      .set("spark.executor.heartbeatInterval", "1s")
+      .set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1s")
     sc = new SparkContext(conf)
     sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
     @volatile var runningTaskIds: Seq[Long] = null
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index ef6213e..b0c187d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -375,8 +375,8 @@ class SparkSubmitSuite
     confMap.keys should contain ("spark.app.name")
     confMap.keys should contain (JARS.key)
     confMap.keys should contain ("spark.driver.memory")
-    confMap.keys should contain ("spark.driver.cores")
-    confMap.keys should contain ("spark.driver.supervise")
+    confMap.keys should contain (DRIVER_CORES.key)
+    confMap.keys should contain (DRIVER_SUPERVISE.key)
     confMap.keys should contain (UI_ENABLED.key)
     confMap.keys should contain (SUBMIT_DEPLOY_MODE.key)
     conf.get(UI_ENABLED) should be (false)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 39daf51..4c6a669 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -135,7 +135,7 @@ class StandaloneDynamicAllocationSuite
   }
 
   test("dynamic allocation with max cores <= cores per worker") {
-    sc = new SparkContext(appConf.set("spark.cores.max", "8"))
+    sc = new SparkContext(appConf.set(config.CORES_MAX, 8))
     val appId = sc.applicationId
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
@@ -190,7 +190,7 @@ class StandaloneDynamicAllocationSuite
   }
 
   test("dynamic allocation with max cores > cores per worker") {
-    sc = new SparkContext(appConf.set("spark.cores.max", "16"))
+    sc = new SparkContext(appConf.set(config.CORES_MAX, 16))
     val appId = sc.applicationId
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
@@ -297,7 +297,7 @@ class StandaloneDynamicAllocationSuite
   test("dynamic allocation with cores per executor AND max cores") {
     sc = new SparkContext(appConf
       .set(config.EXECUTOR_CORES, 2)
-      .set("spark.cores.max", "8"))
+      .set(config.CORES_MAX, 8))
     val appId = sc.applicationId
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
index 87655f3..03102fd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -95,27 +95,27 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
     message.validate()
     // optional fields
     conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar"))
-    conf.set("spark.files", "fireball.png")
+    conf.set(FILES.key, "fireball.png")
     conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
-    conf.set("spark.driver.cores", "180")
+    conf.set(DRIVER_CORES, 180)
     conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red")
     conf.set("spark.driver.extraClassPath", "food-coloring.jar")
     conf.set("spark.driver.extraLibraryPath", "pickle.jar")
-    conf.set("spark.driver.supervise", "false")
+    conf.set(DRIVER_SUPERVISE, false)
     conf.set("spark.executor.memory", "256m")
-    conf.set("spark.cores.max", "10000")
+    conf.set(CORES_MAX, 10000)
     message.sparkProperties = conf.getAll.toMap
     message.appArgs = Array("two slices", "a hint of cinnamon")
     message.environmentVariables = Map("PATH" -> "/dev/null")
     message.validate()
     // bad fields
-    var badConf = conf.clone().set("spark.driver.cores", "one hundred feet")
+    var badConf = conf.clone().set(DRIVER_CORES.key, "one hundred feet")
     message.sparkProperties = badConf.getAll.toMap
     intercept[SubmitRestProtocolException] { message.validate() }
-    badConf = conf.clone().set("spark.driver.supervise", "nope, never")
+    badConf = conf.clone().set(DRIVER_SUPERVISE.key, "nope, never")
     message.sparkProperties = badConf.getAll.toMap
     intercept[SubmitRestProtocolException] { message.validate() }
-    badConf = conf.clone().set("spark.cores.max", "two men")
+    badConf = conf.clone().set(CORES_MAX.key, "two men")
     message.sparkProperties = badConf.getAll.toMap
     intercept[SubmitRestProtocolException] { message.validate() }
     message.sparkProperties = conf.getAll.toMap
@@ -127,17 +127,17 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
     assert(newMessage.appResource === "honey-walnut-cherry.jar")
     assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie")
     assert(newMessage.sparkProperties("spark.app.name") === "SparkPie")
-    assert(newMessage.sparkProperties("spark.jars") === "mayonnaise.jar,ketchup.jar")
-    assert(newMessage.sparkProperties("spark.files") === "fireball.png")
+    assert(newMessage.sparkProperties(JARS.key) === "mayonnaise.jar,ketchup.jar")
+    assert(newMessage.sparkProperties(FILES.key) === "fireball.png")
     assert(newMessage.sparkProperties("spark.driver.memory") === s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
-    assert(newMessage.sparkProperties("spark.driver.cores") === "180")
+    assert(newMessage.sparkProperties(DRIVER_CORES.key) === "180")
     assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") ===
       " -Dslices=5 -Dcolor=mostly_red")
     assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar")
     assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar")
-    assert(newMessage.sparkProperties("spark.driver.supervise") === "false")
+    assert(newMessage.sparkProperties(DRIVER_SUPERVISE.key) === "false")
     assert(newMessage.sparkProperties("spark.executor.memory") === "256m")
-    assert(newMessage.sparkProperties("spark.cores.max") === "10000")
+    assert(newMessage.sparkProperties(CORES_MAX.key) === "10000")
     assert(newMessage.appArgs === message.appArgs)
     assert(newMessage.sparkProperties === message.sparkProperties)
     assert(newMessage.environmentVariables === message.environmentVariables)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index a739701..25c7ef3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -416,12 +416,12 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
 
 class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
   val conf = new SparkConf(false).set("spark.app.id", "test")
-  conf.set("spark.kryoserializer.buffer", "1m")
+  conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
 }
 
 class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
   val conf = new SparkConf(false).set("spark.app.id", "test")
-  conf.set("spark.kryoserializer.buffer", "1m")
+  conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
   conf.set(STORAGE_REPLICATION_PROACTIVE, true)
   conf.set(STORAGE_EXCEPTION_PIN_LEAK, true)
 
@@ -496,7 +496,7 @@ class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Log
 
 class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior {
   val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
-  conf.set("spark.kryoserializer.buffer", "1m")
+  conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
   conf.set(
     STORAGE_REPLICATION_POLICY,
     classOf[BasicBlockReplicationPolicy].getName)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9f3d8f2..78f24fa 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   val shuffleManager = new SortShuffleManager(new SparkConf(false))
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+  val serializer = new KryoSerializer(
+    new SparkConf(false).set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m"))
 
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -86,7 +87,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       .set(IS_TESTING, true)
       .set(MEMORY_FRACTION, 1.0)
       .set(MEMORY_STORAGE_FRACTION, 0.999)
-      .set("spark.kryoserializer.buffer", "1m")
+      .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
       .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 958d57d..a723141 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
 import org.scalatest._
 
 import org.apache.spark._
-import org.apache.spark.internal.config.STORAGE_UNROLL_MEMORY_THRESHOLD
+import org.apache.spark.internal.config._
 import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
@@ -42,7 +42,8 @@ class MemoryStoreSuite
     .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+  val serializer = new KryoSerializer(
+    new SparkConf(false).set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m"))
 
   val serializerManager = new SerializerManager(serializer, conf)
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index fc9996d..6ae90b8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ml.feature
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.Since
+import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param._
@@ -339,7 +340,7 @@ object Word2VecModel extends MLReadable[Word2VecModel] {
       val wordVectors = instance.wordVectors.getVectors
       val dataPath = new Path(path, "data").toString
       val bufferSizeInBytes = Utils.byteStringAsBytes(
-        sc.conf.get("spark.kryoserializer.buffer.max", "64m"))
+        sc.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64m"))
       val numPartitions = Word2VecModelWriter.calculateNumberOfPartitions(
         bufferSizeInBytes, instance.wordVectors.wordIndex.size, instance.getVectorSize)
       val spark = sparkSession
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 94c4fcc..9e19ff2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -32,6 +32,7 @@ import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd._
@@ -679,7 +680,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
       // We want to partition the model in partitions smaller than
       // spark.kryoserializer.buffer.max
       val bufferSize = Utils.byteStringAsBytes(
-        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
+        spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64m"))
       // We calculate the approximate size of the model
       // We only calculate the array size, considering an
       // average string size of 15 bytes, the formula is:
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8566a30..289b109 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -541,7 +541,7 @@ private[spark] class MesosClusterScheduler(
     // --conf
     val replicatedOptionsBlacklist = Set(
       JARS.key, // Avoids duplicate classes in classpath
-      "spark.submit.deployMode", // this would be set to `cluster`, but we need client
+      SUBMIT_DEPLOY_MODE.key, // this would be set to `cluster`, but we need client
       "spark.master" // this contains the address of the dispatcher, not master
     )
     val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4b6b3d..5a67caf 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -215,7 +215,7 @@ private[spark] class ApplicationMaster(
 
         // Set the master and deploy mode property to match the requested mode.
         System.setProperty("spark.master", "yarn")
-        System.setProperty("spark.submit.deployMode", "cluster")
+        System.setProperty(SUBMIT_DEPLOY_MODE.key, "cluster")
 
         // Set this internal configuration if it is running on cluster mode, this
         // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org