You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/22 01:45:30 UTC
[spark] branch master updated: [SPARK-27774][CORE][MLLIB] Avoid
hardcoded configs
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e7443d6 [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs
e7443d6 is described below
commit e7443d6412582aa16769e298c31d889a5ba0143c
Author: wenxuanguan <ch...@126.com>
AuthorDate: Wed May 22 10:45:11 2019 +0900
[SPARK-27774][CORE][MLLIB] Avoid hardcoded configs
## What changes were proposed in this pull request?
avoid hardcoded configs in `SparkConf` and `SparkSubmit` and test
## How was this patch tested?
N/A
Closes #24631 from wenxuanguan/minor-fix.
Authored-by: wenxuanguan <ch...@126.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../main/scala/org/apache/spark/SparkConf.scala | 8 ++++----
.../main/scala/org/apache/spark/api/r/RUtils.scala | 2 +-
.../scala/org/apache/spark/deploy/RRunner.scala | 3 ++-
.../org/apache/spark/deploy/SparkSubmit.scala | 6 +++---
.../apache/spark/deploy/worker/DriverWrapper.scala | 2 +-
.../scala/org/apache/spark/SparkConfSuite.scala | 2 +-
.../scala/org/apache/spark/SparkContextSuite.scala | 2 +-
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++--
.../deploy/StandaloneDynamicAllocationSuite.scala | 6 +++---
.../deploy/rest/SubmitRestProtocolSuite.scala | 24 +++++++++++-----------
.../storage/BlockManagerReplicationSuite.scala | 6 +++---
.../apache/spark/storage/BlockManagerSuite.scala | 5 +++--
.../apache/spark/storage/MemoryStoreSuite.scala | 5 +++--
.../org/apache/spark/ml/feature/Word2Vec.scala | 3 ++-
.../org/apache/spark/mllib/feature/Word2Vec.scala | 3 ++-
.../cluster/mesos/MesosClusterScheduler.scala | 2 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 2 +-
17 files changed, 45 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index aa93f42..bd2ef5b 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -595,7 +595,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " +
s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than the value of " +
- s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
+ s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.")
}
/**
@@ -667,12 +667,12 @@ private[spark] object SparkConf extends Logging {
translation = s => s"${s.toLong * 10}s")),
REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
- "spark.kryoserializer.buffer" -> Seq(
+ KRYO_SERIALIZER_BUFFER_SIZE.key -> Seq(
AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
- "spark.kryoserializer.buffer.max" -> Seq(
+ KRYO_SERIALIZER_MAX_BUFFER_SIZE.key -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
- "spark.shuffle.file.buffer" -> Seq(
+ SHUFFLE_FILE_BUFFER_SIZE.key -> Seq(
AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 5a43302..311fade 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -60,7 +60,7 @@ private[spark] object RUtils {
def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
val (master, deployMode) =
if (isDriver) {
- (sys.props("spark.master"), sys.props("spark.submit.deployMode"))
+ (sys.props("spark.master"), sys.props(SUBMIT_DEPLOY_MODE.key))
} else {
val sparkConf = SparkEnv.get.conf
(sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index 6284e6a..60ba047 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.api.r.{RBackend, RUtils}
import org.apache.spark.internal.config.R._
+import org.apache.spark.internal.config.SUBMIT_DEPLOY_MODE
import org.apache.spark.util.RedirectThread
/**
@@ -46,7 +47,7 @@ object RRunner {
// but kept here for backward compatibility.
var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get)
cmd = sys.props.getOrElse(R_COMMAND.key, cmd)
- if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
+ if (sys.props.getOrElse(SUBMIT_DEPLOY_MODE.key, "client") == "client") {
cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
}
cmd
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 49d9395..59b638b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -517,7 +517,7 @@ private[spark] class SparkSubmit extends Logging {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
- confKey = "spark.submit.deployMode"),
+ confKey = SUBMIT_DEPLOY_MODE.key),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
@@ -747,8 +747,8 @@ private[spark] class SparkSubmit extends Logging {
// Resolve paths in certain spark properties
val pathConfigs = Seq(
- "spark.jars",
- "spark.files",
+ JARS.key,
+ FILES.key,
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 1e8ad0b..9b51bea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -91,7 +91,7 @@ object DriverWrapper extends Logging {
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
val jars = {
- val jarsProp = sys.props.get("spark.jars").orNull
+ val jarsProp = sys.props.get(config.JARS.key).orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
} else {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 3a5de8a..83a9ea3 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -248,7 +248,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
conf.set("spark.kryoserializer.buffer.mb", "1.1")
- assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100)
+ assert(conf.getSizeAsKb(KRYO_SERIALIZER_BUFFER_SIZE.key) === 1100)
conf.set("spark.history.fs.cleaner.maxAge.seconds", "42")
assert(conf.get(MAX_LOG_AGE_S) === 42L)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 3490eaf..1dcb2f7 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -671,7 +671,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
// task metrics via heartbeat to driver.
.set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
// Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast
- .set("spark.executor.heartbeatInterval", "1s")
+ .set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1s")
sc = new SparkContext(conf)
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
@volatile var runningTaskIds: Seq[Long] = null
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index ef6213e..b0c187d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -375,8 +375,8 @@ class SparkSubmitSuite
confMap.keys should contain ("spark.app.name")
confMap.keys should contain (JARS.key)
confMap.keys should contain ("spark.driver.memory")
- confMap.keys should contain ("spark.driver.cores")
- confMap.keys should contain ("spark.driver.supervise")
+ confMap.keys should contain (DRIVER_CORES.key)
+ confMap.keys should contain (DRIVER_SUPERVISE.key)
confMap.keys should contain (UI_ENABLED.key)
confMap.keys should contain (SUBMIT_DEPLOY_MODE.key)
conf.get(UI_ENABLED) should be (false)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 39daf51..4c6a669 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -135,7 +135,7 @@ class StandaloneDynamicAllocationSuite
}
test("dynamic allocation with max cores <= cores per worker") {
- sc = new SparkContext(appConf.set("spark.cores.max", "8"))
+ sc = new SparkContext(appConf.set(config.CORES_MAX, 8))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
@@ -190,7 +190,7 @@ class StandaloneDynamicAllocationSuite
}
test("dynamic allocation with max cores > cores per worker") {
- sc = new SparkContext(appConf.set("spark.cores.max", "16"))
+ sc = new SparkContext(appConf.set(config.CORES_MAX, 16))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
@@ -297,7 +297,7 @@ class StandaloneDynamicAllocationSuite
test("dynamic allocation with cores per executor AND max cores") {
sc = new SparkContext(appConf
.set(config.EXECUTOR_CORES, 2)
- .set("spark.cores.max", "8"))
+ .set(config.CORES_MAX, 8))
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
index 87655f3..03102fd 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala
@@ -95,27 +95,27 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
message.validate()
// optional fields
conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar"))
- conf.set("spark.files", "fireball.png")
+ conf.set(FILES.key, "fireball.png")
conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
- conf.set("spark.driver.cores", "180")
+ conf.set(DRIVER_CORES, 180)
conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red")
conf.set("spark.driver.extraClassPath", "food-coloring.jar")
conf.set("spark.driver.extraLibraryPath", "pickle.jar")
- conf.set("spark.driver.supervise", "false")
+ conf.set(DRIVER_SUPERVISE, false)
conf.set("spark.executor.memory", "256m")
- conf.set("spark.cores.max", "10000")
+ conf.set(CORES_MAX, 10000)
message.sparkProperties = conf.getAll.toMap
message.appArgs = Array("two slices", "a hint of cinnamon")
message.environmentVariables = Map("PATH" -> "/dev/null")
message.validate()
// bad fields
- var badConf = conf.clone().set("spark.driver.cores", "one hundred feet")
+ var badConf = conf.clone().set(DRIVER_CORES.key, "one hundred feet")
message.sparkProperties = badConf.getAll.toMap
intercept[SubmitRestProtocolException] { message.validate() }
- badConf = conf.clone().set("spark.driver.supervise", "nope, never")
+ badConf = conf.clone().set(DRIVER_SUPERVISE.key, "nope, never")
message.sparkProperties = badConf.getAll.toMap
intercept[SubmitRestProtocolException] { message.validate() }
- badConf = conf.clone().set("spark.cores.max", "two men")
+ badConf = conf.clone().set(CORES_MAX.key, "two men")
message.sparkProperties = badConf.getAll.toMap
intercept[SubmitRestProtocolException] { message.validate() }
message.sparkProperties = conf.getAll.toMap
@@ -127,17 +127,17 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
assert(newMessage.appResource === "honey-walnut-cherry.jar")
assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie")
assert(newMessage.sparkProperties("spark.app.name") === "SparkPie")
- assert(newMessage.sparkProperties("spark.jars") === "mayonnaise.jar,ketchup.jar")
- assert(newMessage.sparkProperties("spark.files") === "fireball.png")
+ assert(newMessage.sparkProperties(JARS.key) === "mayonnaise.jar,ketchup.jar")
+ assert(newMessage.sparkProperties(FILES.key) === "fireball.png")
assert(newMessage.sparkProperties("spark.driver.memory") === s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
- assert(newMessage.sparkProperties("spark.driver.cores") === "180")
+ assert(newMessage.sparkProperties(DRIVER_CORES.key) === "180")
assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") ===
" -Dslices=5 -Dcolor=mostly_red")
assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar")
assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar")
- assert(newMessage.sparkProperties("spark.driver.supervise") === "false")
+ assert(newMessage.sparkProperties(DRIVER_SUPERVISE.key) === "false")
assert(newMessage.sparkProperties("spark.executor.memory") === "256m")
- assert(newMessage.sparkProperties("spark.cores.max") === "10000")
+ assert(newMessage.sparkProperties(CORES_MAX.key) === "10000")
assert(newMessage.appArgs === message.appArgs)
assert(newMessage.sparkProperties === message.sparkProperties)
assert(newMessage.environmentVariables === message.environmentVariables)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index a739701..25c7ef3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -416,12 +416,12 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
val conf = new SparkConf(false).set("spark.app.id", "test")
- conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
}
class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
val conf = new SparkConf(false).set("spark.app.id", "test")
- conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
conf.set(STORAGE_REPLICATION_PROACTIVE, true)
conf.set(STORAGE_EXCEPTION_PIN_LEAK, true)
@@ -496,7 +496,7 @@ class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Log
class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior {
val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
- conf.set("spark.kryoserializer.buffer", "1m")
+ conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
conf.set(
STORAGE_REPLICATION_POLICY,
classOf[BasicBlockReplicationPolicy].getName)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9f3d8f2..78f24fa 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val shuffleManager = new SortShuffleManager(new SparkConf(false))
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+ val serializer = new KryoSerializer(
+ new SparkConf(false).set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m"))
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -86,7 +87,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
.set(IS_TESTING, true)
.set(MEMORY_FRACTION, 1.0)
.set(MEMORY_STORAGE_FRACTION, 0.999)
- .set("spark.kryoserializer.buffer", "1m")
+ .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 958d57d..a723141 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import org.scalatest._
import org.apache.spark._
-import org.apache.spark.internal.config.STORAGE_UNROLL_MEMORY_THRESHOLD
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
@@ -42,7 +42,8 @@ class MemoryStoreSuite
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
- val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+ val serializer = new KryoSerializer(
+ new SparkConf(false).set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m"))
val serializerManager = new SerializerManager(serializer, conf)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index fc9996d..6ae90b8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ml.feature
import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.Since
+import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
@@ -339,7 +340,7 @@ object Word2VecModel extends MLReadable[Word2VecModel] {
val wordVectors = instance.wordVectors.getVectors
val dataPath = new Path(path, "data").toString
val bufferSizeInBytes = Utils.byteStringAsBytes(
- sc.conf.get("spark.kryoserializer.buffer.max", "64m"))
+ sc.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64m"))
val numPartitions = Word2VecModelWriter.calculateNumberOfPartitions(
bufferSizeInBytes, instance.wordVectors.wordIndex.size, instance.getVectorSize)
val spark = sparkSession
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 94c4fcc..9e19ff2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -32,6 +32,7 @@ import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd._
@@ -679,7 +680,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
// We want to partition the model in partitions smaller than
// spark.kryoserializer.buffer.max
val bufferSize = Utils.byteStringAsBytes(
- spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
+ spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64m"))
// We calculate the approximate size of the model
// We only calculate the array size, considering an
// average string size of 15 bytes, the formula is:
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8566a30..289b109 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -541,7 +541,7 @@ private[spark] class MesosClusterScheduler(
// --conf
val replicatedOptionsBlacklist = Set(
JARS.key, // Avoids duplicate classes in classpath
- "spark.submit.deployMode", // this would be set to `cluster`, but we need client
+ SUBMIT_DEPLOY_MODE.key, // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4b6b3d..5a67caf 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -215,7 +215,7 @@ private[spark] class ApplicationMaster(
// Set the master and deploy mode property to match the requested mode.
System.setProperty("spark.master", "yarn")
- System.setProperty("spark.submit.deployMode", "cluster")
+ System.setProperty(SUBMIT_DEPLOY_MODE.key, "cluster")
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org