You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/03 22:30:39 UTC
[spark] branch master updated: [SPARK-26489][CORE] Use ConfigEntry
for hardcoded configs for python/r categories
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 05372d1 [SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories
05372d1 is described below
commit 05372d188aeaeff5e8de8866ec6e7b932bafa70f
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Thu Jan 3 14:30:27 2019 -0800
[SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories
## What changes were proposed in this pull request?
The PR makes hardcoded configs below to use ConfigEntry.
* spark.pyspark
* spark.python
* spark.r
This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties, python source code)
## How was this patch tested?
Existing tests.
Closes #23428 from HeartSaVioR/SPARK-26489.
Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../org/apache/spark/api/python/PythonRunner.scala | 6 +--
.../spark/api/python/PythonWorkerFactory.scala | 15 +++----
.../scala/org/apache/spark/api/r/RBackend.scala | 10 ++---
.../org/apache/spark/api/r/RBackendHandler.scala | 7 ++--
.../scala/org/apache/spark/api/r/RRunner.scala | 8 ++--
.../scala/org/apache/spark/deploy/RRunner.scala | 9 +++--
.../org/apache/spark/internal/config/Python.scala | 47 ++++++++++++++++++++++
.../config/R.scala} | 26 ++++++++----
.../org/apache/spark/internal/config/package.scala | 4 --
.../k8s/features/BasicExecutorFeatureStep.scala | 1 +
.../features/BasicExecutorFeatureStepSuite.scala | 1 +
.../org/apache/spark/deploy/yarn/Client.scala | 1 +
.../apache/spark/deploy/yarn/YarnAllocator.scala | 1 +
13 files changed, 96 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f73e95e..6b748c8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters._
import org.apache.spark._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
+import org.apache.spark.internal.config.Python._
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util._
@@ -71,7 +71,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val conf = SparkEnv.get.conf
private val bufferSize = conf.getInt("spark.buffer.size", 65536)
- private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+ private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
// each python worker gets an equal part of the allocation. the worker pool will grow to the
// number of concurrent tasks, which is determined by the number of cores in this executor.
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
@@ -496,7 +496,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
extends Thread(s"Worker Monitor for $pythonExec") {
/** How long to wait before killing the python worker if a task cannot be interrupted. */
- private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
+ private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
setDaemon(true)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 1f2f503..09e219f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable
import org.apache.spark._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Python._
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util.{RedirectThread, Utils}
@@ -41,7 +42,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// currently only works on UNIX-based systems now because it uses signals for child management,
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
private val useDaemon = {
- val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
+ val useDaemonEnabled = SparkEnv.get.conf.get(PYTHON_USE_DAEMON)
// This flag is ignored on Windows as it's unable to fork.
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
@@ -53,21 +54,21 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// This configuration indicates the module to run the daemon to execute its Python workers.
private val daemonModule =
- SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
+ SparkEnv.get.conf.get(PYTHON_DAEMON_MODULE).map { value =>
logInfo(
- s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
+ s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
- "'spark.python.use.daemon' is enabled and the platform is not Windows.")
+ s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.")
value
}.getOrElse("pyspark.daemon")
// This configuration indicates the module to run each Python worker.
private val workerModule =
- SparkEnv.get.conf.getOption("spark.python.worker.module").map { value =>
+ SparkEnv.get.conf.get(PYTHON_WORKER_MODULE).map { value =>
logInfo(
- s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " +
+ s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " +
"using this to start the worker up. Note that this configuration only has an effect when " +
- "'spark.python.use.daemon' is disabled or the platform is Windows.")
+ s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.")
value
}.getOrElse("pyspark.worker")
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
index 50c8fdf..36b4132 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
@@ -32,6 +32,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.R._
/**
* Netty-based backend server that is used to communicate between R and Java.
@@ -47,10 +48,8 @@ private[spark] class RBackend {
def init(): (Int, RAuthHelper) = {
val conf = new SparkConf()
- val backendConnectionTimeout = conf.getInt(
- "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
- bossGroup = new NioEventLoopGroup(
- conf.getInt("spark.r.numRBackendThreads", SparkRDefaults.DEFAULT_NUM_RBACKEND_THREADS))
+ val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
+ bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
val workerGroup = bossGroup
val handler = new RBackendHandler(this)
val authHelper = new RAuthHelper(conf)
@@ -126,8 +125,7 @@ private[spark] object RBackend extends Logging {
// Connection timeout is set by socket client. To make it configurable we will pass the
// timeout value to client inside the temp file
val conf = new SparkConf()
- val backendConnectionTimeout = conf.getInt(
- "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
+ val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
// tell the R process via temporary file
val path = args(0)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 18fc595..7b74efa 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -29,6 +29,7 @@ import io.netty.handler.timeout.ReadTimeoutException
import org.apache.spark.SparkConf
import org.apache.spark.api.r.SerDe._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.R._
import org.apache.spark.util.{ThreadUtils, Utils}
/**
@@ -98,10 +99,8 @@ private[r] class RBackendHandler(server: RBackend)
}
}
val conf = new SparkConf()
- val heartBeatInterval = conf.getInt(
- "spark.r.heartBeatInterval", SparkRDefaults.DEFAULT_HEARTBEAT_INTERVAL)
- val backendConnectionTimeout = conf.getInt(
- "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
+ val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
+ val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)
execService.scheduleAtFixedRate(pingRunner, interval, interval, TimeUnit.SECONDS)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
index e7fdc39..3fdea04 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
@@ -27,6 +27,7 @@ import scala.util.Try
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.R._
import org.apache.spark.util.Utils
/**
@@ -340,11 +341,10 @@ private[r] object RRunner {
// "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
// but kept here for backward compatibility.
val sparkConf = SparkEnv.get.conf
- var rCommand = sparkConf.get("spark.sparkr.r.command", "Rscript")
- rCommand = sparkConf.get("spark.r.command", rCommand)
+ var rCommand = sparkConf.get(SPARKR_COMMAND)
+ rCommand = sparkConf.get(R_COMMAND).orElse(Some(rCommand)).get
- val rConnectionTimeout = sparkConf.getInt(
- "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
+ val rConnectionTimeout = sparkConf.get(R_BACKEND_CONNECTION_TIMEOUT)
val rOptions = "--vanilla"
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index e86b362..6284e6a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -25,7 +25,8 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, SparkUserAppException}
-import org.apache.spark.api.r.{RBackend, RUtils, SparkRDefaults}
+import org.apache.spark.api.r.{RBackend, RUtils}
+import org.apache.spark.internal.config.R._
import org.apache.spark.util.RedirectThread
/**
@@ -43,8 +44,8 @@ object RRunner {
val rCommand = {
// "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
// but kept here for backward compatibility.
- var cmd = sys.props.getOrElse("spark.sparkr.r.command", "Rscript")
- cmd = sys.props.getOrElse("spark.r.command", cmd)
+ var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get)
+ cmd = sys.props.getOrElse(R_COMMAND.key, cmd)
if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
}
@@ -53,7 +54,7 @@ object RRunner {
// Connection timeout set by R process on its connection to RBackend in seconds.
val backendConnectionTimeout = sys.props.getOrElse(
- "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT.toString)
+ R_BACKEND_CONNECTION_TIMEOUT.key, R_BACKEND_CONNECTION_TIMEOUT.defaultValue.get.toString)
// Check if the file path exists.
// If not, change directory to current working directory for YARN cluster mode
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
new file mode 100644
index 0000000..26a0598
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Python {
+ val PYTHON_WORKER_REUSE = ConfigBuilder("spark.python.worker.reuse")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PYTHON_TASK_KILL_TIMEOUT = ConfigBuilder("spark.python.task.killTimeout")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("2s")
+
+ val PYTHON_USE_DAEMON = ConfigBuilder("spark.python.use.daemon")
+ .booleanConf
+ .createWithDefault(true)
+
+ val PYTHON_DAEMON_MODULE = ConfigBuilder("spark.python.daemon.module")
+ .stringConf
+ .createOptional
+
+ val PYTHON_WORKER_MODULE = ConfigBuilder("spark.python.worker.module")
+ .stringConf
+ .createOptional
+
+ val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
+ .bytesConf(ByteUnit.MiB)
+ .createOptional
+}
diff --git a/core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala b/core/src/main/scala/org/apache/spark/internal/config/R.scala
similarity index 56%
rename from core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala
rename to core/src/main/scala/org/apache/spark/internal/config/R.scala
index af67cbb..26e06a5 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/R.scala
@@ -14,17 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.internal.config
-package org.apache.spark.api.r
+private[spark] object R {
-private[spark] object SparkRDefaults {
+ val R_BACKEND_CONNECTION_TIMEOUT = ConfigBuilder("spark.r.backendConnectionTimeout")
+ .intConf
+ .createWithDefault(6000)
- // Default value for spark.r.backendConnectionTimeout config
- val DEFAULT_CONNECTION_TIMEOUT: Int = 6000
+ val R_NUM_BACKEND_THREADS = ConfigBuilder("spark.r.numRBackendThreads")
+ .intConf
+ .createWithDefault(2)
- // Default value for spark.r.heartBeatInterval config
- val DEFAULT_HEARTBEAT_INTERVAL: Int = 100
+ val R_HEARTBEAT_INTERVAL = ConfigBuilder("spark.r.heartBeatInterval")
+ .intConf
+ .createWithDefault(100)
- // Default value for spark.r.numRBackendThreads config
- val DEFAULT_NUM_RBACKEND_THREADS = 2
+ val SPARKR_COMMAND = ConfigBuilder("spark.sparkr.r.command")
+ .stringConf
+ .createWithDefault("Rscript")
+
+ val R_COMMAND = ConfigBuilder("spark.r.command")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index d8e9c09..da80604 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -166,10 +166,6 @@ package object config {
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
.createWithDefault(0)
- private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
- .bytesConf(ByteUnit.MiB)
- .createOptional
-
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index c8bf7cd..dd73a5e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index c2efab0..e28c650 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf,
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 184fb6a..44a60b8 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -53,6 +53,7 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.util.{CallerContext, Utils}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a3feca5..8c6eff99 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org