You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/03 22:30:39 UTC

[spark] branch master updated: [SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 05372d1  [SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories
05372d1 is described below

commit 05372d188aeaeff5e8de8866ec6e7b932bafa70f
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Thu Jan 3 14:30:27 2019 -0800

    [SPARK-26489][CORE] Use ConfigEntry for hardcoded configs for python/r categories
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded configs below to use ConfigEntry.
    
    * spark.pyspark
    * spark.python
    * spark.r
    
    This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties, python source code)
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #23428 from HeartSaVioR/SPARK-26489.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../org/apache/spark/api/python/PythonRunner.scala |  6 +--
 .../spark/api/python/PythonWorkerFactory.scala     | 15 +++----
 .../scala/org/apache/spark/api/r/RBackend.scala    | 10 ++---
 .../org/apache/spark/api/r/RBackendHandler.scala   |  7 ++--
 .../scala/org/apache/spark/api/r/RRunner.scala     |  8 ++--
 .../scala/org/apache/spark/deploy/RRunner.scala    |  9 +++--
 .../org/apache/spark/internal/config/Python.scala  | 47 ++++++++++++++++++++++
 .../config/R.scala}                                | 26 ++++++++----
 .../org/apache/spark/internal/config/package.scala |  4 --
 .../k8s/features/BasicExecutorFeatureStep.scala    |  1 +
 .../features/BasicExecutorFeatureStepSuite.scala   |  1 +
 .../org/apache/spark/deploy/yarn/Client.scala      |  1 +
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  1 +
 13 files changed, 96 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f73e95e..6b748c8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
+import org.apache.spark.internal.config.Python._
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util._
 
@@ -71,7 +71,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 
   private val conf = SparkEnv.get.conf
   private val bufferSize = conf.getInt("spark.buffer.size", 65536)
-  private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+  private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
   // each python worker gets an equal part of the allocation. the worker pool will grow to the
   // number of concurrent tasks, which is determined by the number of cores in this executor.
   private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
@@ -496,7 +496,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     extends Thread(s"Worker Monitor for $pythonExec") {
 
     /** How long to wait before killing the python worker if a task cannot be interrupted. */
-    private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
+    private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
 
     setDaemon(true)
 
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 1f2f503..09e219f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Python._
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util.{RedirectThread, Utils}
 
@@ -41,7 +42,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
   // currently only works on UNIX-based systems now because it uses signals for child management,
   // so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
   private val useDaemon = {
-    val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
+    val useDaemonEnabled = SparkEnv.get.conf.get(PYTHON_USE_DAEMON)
 
     // This flag is ignored on Windows as it's unable to fork.
     !System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
@@ -53,21 +54,21 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
   // This configuration indicates the module to run the daemon to execute its Python workers.
   private val daemonModule =
-    SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
+    SparkEnv.get.conf.get(PYTHON_DAEMON_MODULE).map { value =>
       logInfo(
-        s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
+        s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " +
         "using this to start the daemon up. Note that this configuration only has an effect when " +
-        "'spark.python.use.daemon' is enabled and the platform is not Windows.")
+        s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.")
       value
     }.getOrElse("pyspark.daemon")
 
   // This configuration indicates the module to run each Python worker.
   private val workerModule =
-    SparkEnv.get.conf.getOption("spark.python.worker.module").map { value =>
+    SparkEnv.get.conf.get(PYTHON_WORKER_MODULE).map { value =>
       logInfo(
-        s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " +
+        s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " +
         "using this to start the worker up. Note that this configuration only has an effect when " +
-        "'spark.python.use.daemon' is disabled or the platform is Windows.")
+        s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.")
       value
     }.getOrElse("pyspark.worker")
 
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
index 50c8fdf..36b4132 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
@@ -32,6 +32,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.R._
 
 /**
  * Netty-based backend server that is used to communicate between R and Java.
@@ -47,10 +48,8 @@ private[spark] class RBackend {
 
   def init(): (Int, RAuthHelper) = {
     val conf = new SparkConf()
-    val backendConnectionTimeout = conf.getInt(
-      "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
-    bossGroup = new NioEventLoopGroup(
-      conf.getInt("spark.r.numRBackendThreads", SparkRDefaults.DEFAULT_NUM_RBACKEND_THREADS))
+    val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
+    bossGroup = new NioEventLoopGroup(conf.get(R_NUM_BACKEND_THREADS))
     val workerGroup = bossGroup
     val handler = new RBackendHandler(this)
     val authHelper = new RAuthHelper(conf)
@@ -126,8 +125,7 @@ private[spark] object RBackend extends Logging {
       // Connection timeout is set by socket client. To make it configurable we will pass the
       // timeout value to client inside the temp file
       val conf = new SparkConf()
-      val backendConnectionTimeout = conf.getInt(
-        "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
+      val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
 
       // tell the R process via temporary file
       val path = args(0)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 18fc595..7b74efa 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -29,6 +29,7 @@ import io.netty.handler.timeout.ReadTimeoutException
 import org.apache.spark.SparkConf
 import org.apache.spark.api.r.SerDe._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.R._
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
@@ -98,10 +99,8 @@ private[r] class RBackendHandler(server: RBackend)
         }
       }
       val conf = new SparkConf()
-      val heartBeatInterval = conf.getInt(
-        "spark.r.heartBeatInterval", SparkRDefaults.DEFAULT_HEARTBEAT_INTERVAL)
-      val backendConnectionTimeout = conf.getInt(
-        "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
+      val heartBeatInterval = conf.get(R_HEARTBEAT_INTERVAL)
+      val backendConnectionTimeout = conf.get(R_BACKEND_CONNECTION_TIMEOUT)
       val interval = Math.min(heartBeatInterval, backendConnectionTimeout - 1)
 
       execService.scheduleAtFixedRate(pingRunner, interval, interval, TimeUnit.SECONDS)
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
index e7fdc39..3fdea04 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
@@ -27,6 +27,7 @@ import scala.util.Try
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.R._
 import org.apache.spark.util.Utils
 
 /**
@@ -340,11 +341,10 @@ private[r] object RRunner {
     // "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
     // but kept here for backward compatibility.
     val sparkConf = SparkEnv.get.conf
-    var rCommand = sparkConf.get("spark.sparkr.r.command", "Rscript")
-    rCommand = sparkConf.get("spark.r.command", rCommand)
+    var rCommand = sparkConf.get(SPARKR_COMMAND)
+    rCommand = sparkConf.get(R_COMMAND).orElse(Some(rCommand)).get
 
-    val rConnectionTimeout = sparkConf.getInt(
-      "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT)
+    val rConnectionTimeout = sparkConf.get(R_BACKEND_CONNECTION_TIMEOUT)
     val rOptions = "--vanilla"
     val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
     val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
index e86b362..6284e6a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala
@@ -25,7 +25,8 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{SparkException, SparkUserAppException}
-import org.apache.spark.api.r.{RBackend, RUtils, SparkRDefaults}
+import org.apache.spark.api.r.{RBackend, RUtils}
+import org.apache.spark.internal.config.R._
 import org.apache.spark.util.RedirectThread
 
 /**
@@ -43,8 +44,8 @@ object RRunner {
     val rCommand = {
       // "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
       // but kept here for backward compatibility.
-      var cmd = sys.props.getOrElse("spark.sparkr.r.command", "Rscript")
-      cmd = sys.props.getOrElse("spark.r.command", cmd)
+      var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get)
+      cmd = sys.props.getOrElse(R_COMMAND.key, cmd)
       if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
         cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
       }
@@ -53,7 +54,7 @@ object RRunner {
 
     //  Connection timeout set by R process on its connection to RBackend in seconds.
     val backendConnectionTimeout = sys.props.getOrElse(
-      "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT.toString)
+      R_BACKEND_CONNECTION_TIMEOUT.key, R_BACKEND_CONNECTION_TIMEOUT.defaultValue.get.toString)
 
     // Check if the file path exists.
     // If not, change directory to current working directory for YARN cluster mode
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
new file mode 100644
index 0000000..26a0598
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Python {
+  val PYTHON_WORKER_REUSE = ConfigBuilder("spark.python.worker.reuse")
+    .booleanConf
+    .createWithDefault(true)
+
+  val PYTHON_TASK_KILL_TIMEOUT = ConfigBuilder("spark.python.task.killTimeout")
+    .timeConf(TimeUnit.MILLISECONDS)
+    .createWithDefaultString("2s")
+
+  val PYTHON_USE_DAEMON = ConfigBuilder("spark.python.use.daemon")
+    .booleanConf
+    .createWithDefault(true)
+
+  val PYTHON_DAEMON_MODULE = ConfigBuilder("spark.python.daemon.module")
+    .stringConf
+    .createOptional
+
+  val PYTHON_WORKER_MODULE = ConfigBuilder("spark.python.worker.module")
+    .stringConf
+    .createOptional
+
+  val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
+    .bytesConf(ByteUnit.MiB)
+    .createOptional
+}
diff --git a/core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala b/core/src/main/scala/org/apache/spark/internal/config/R.scala
similarity index 56%
rename from core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala
rename to core/src/main/scala/org/apache/spark/internal/config/R.scala
index af67cbb..26e06a5 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SparkRDefaults.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/R.scala
@@ -14,17 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.internal.config
 
-package org.apache.spark.api.r
+private[spark] object R {
 
-private[spark] object SparkRDefaults {
+  val R_BACKEND_CONNECTION_TIMEOUT = ConfigBuilder("spark.r.backendConnectionTimeout")
+    .intConf
+    .createWithDefault(6000)
 
-  // Default value for spark.r.backendConnectionTimeout config
-  val DEFAULT_CONNECTION_TIMEOUT: Int = 6000
+  val R_NUM_BACKEND_THREADS = ConfigBuilder("spark.r.numRBackendThreads")
+    .intConf
+    .createWithDefault(2)
 
-  // Default value for spark.r.heartBeatInterval config
-  val DEFAULT_HEARTBEAT_INTERVAL: Int = 100
+  val R_HEARTBEAT_INTERVAL = ConfigBuilder("spark.r.heartBeatInterval")
+    .intConf
+    .createWithDefault(100)
 
-  // Default value for spark.r.numRBackendThreads config
-  val DEFAULT_NUM_RBACKEND_THREADS = 2
+  val SPARKR_COMMAND = ConfigBuilder("spark.sparkr.r.command")
+    .stringConf
+    .createWithDefault("Rscript")
+
+  val R_COMMAND = ConfigBuilder("spark.r.command")
+    .stringConf
+    .createOptional
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index d8e9c09..da80604 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -166,10 +166,6 @@ package object config {
     .checkValue(_ >= 0, "The off-heap memory size must not be negative")
     .createWithDefault(0)
 
-  private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
-    .bytesConf(ByteUnit.MiB)
-    .createOptional
-
   private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
     .booleanConf.createWithDefault(false)
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index c8bf7cd..dd73a5e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index c2efab0..e28c650 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf,
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 184fb6a..44a60b8 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -53,6 +53,7 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
 import org.apache.spark.util.{CallerContext, Utils}
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a3feca5..8c6eff99 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Python._
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org