You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/06 19:15:16 UTC

[spark] branch master updated: [SPARK-27023][K8S] Make k8s client timeouts configurable

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e9e8bb3  [SPARK-27023][K8S] Make k8s client timeouts configurable
e9e8bb3 is described below

commit e9e8bb33ef9ad785473ded168bc85867dad4ee70
Author: Onur Satici <os...@palantir.com>
AuthorDate: Wed Mar 6 11:14:39 2019 -0800

    [SPARK-27023][K8S] Make k8s client timeouts configurable
    
    ## What changes were proposed in this pull request?
    
    Make k8s client timeouts configurable. No test suite exists for the client factory class, happy to add one if needed
    
    Closes #23928 from onursatici/os/k8s-client-timeouts.
    
    Lead-authored-by: Onur Satici <os...@palantir.com>
    Co-authored-by: Onur Satici <on...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 docs/running-on-kubernetes.md                      | 28 ++++++++++++++++++++++
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 24 +++++++++++++++++++
 .../deploy/k8s/SparkKubernetesClientFactory.scala  | 20 ++++++++++++++++
 .../k8s/submit/KubernetesClientApplication.scala   |  1 +
 .../cluster/k8s/KubernetesClusterManager.scala     |  1 +
 5 files changed, 74 insertions(+)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e23f28c..d82dc64 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -990,6 +990,34 @@ See the [configuration page](configuration.html) for information on Spark config
   Specify whether executor pods should be deleted in case of failure or normal termination.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.submission.connectionTimeout</code></td>
+  <td>10000</td>
+  <td>
+    Connection timeout in milliseconds for the kubernetes client to use for starting the driver.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.submission.requestTimeout</code></td>
+  <td>10000</td>
+  <td>
+    Request timeout in milliseconds for the kubernetes client to use for starting the driver.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.driver.connectionTimeout</code></td>
+  <td>10000</td>
+  <td>
+    Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.driver.requestTimeout</code></td>
+  <td>10000</td>
+  <td>
+    Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
+  </td>
+</tr>
 </table>
 
 #### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 4cca1e2..83b5a75 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -86,6 +86,30 @@ private[spark] object Config extends Logging {
   val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
   val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
 
+  val SUBMISSION_CLIENT_REQUEST_TIMEOUT =
+    ConfigBuilder("spark.kubernetes.submission.requestTimeout")
+      .doc("request timeout to be used in milliseconds for starting the driver")
+      .intConf
+      .createWithDefault(10000)
+
+  val SUBMISSION_CLIENT_CONNECTION_TIMEOUT =
+    ConfigBuilder("spark.kubernetes.submission.connectionTimeout")
+      .doc("connection timeout to be used in milliseconds for starting the driver")
+      .intConf
+      .createWithDefault(10000)
+
+  val DRIVER_CLIENT_REQUEST_TIMEOUT =
+    ConfigBuilder("spark.kubernetes.driver.requestTimeout")
+      .doc("request timeout to be used in milliseconds for driver to request executors")
+      .intConf
+      .createWithDefault(10000)
+
+  val DRIVER_CLIENT_CONNECTION_TIMEOUT =
+    ConfigBuilder("spark.kubernetes.driver.connectionTimeout")
+      .doc("connection timeout to be used in milliseconds for driver to request executors")
+      .intConf
+      .createWithDefault(10000)
+
   val KUBERNETES_SERVICE_ACCOUNT_NAME =
     ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
       .doc("Service account that is used when running the driver pod. The driver pod uses " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 06dea42..459259f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -28,6 +28,7 @@ import okhttp3.Dispatcher
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -41,6 +42,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
       master: String,
       namespace: Option[String],
       kubernetesAuthConfPrefix: String,
+      clientType: ClientType.Value,
       sparkConf: SparkConf,
       defaultServiceAccountToken: Option[File],
       defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
@@ -79,6 +81,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
       .withApiVersion("v1")
       .withMasterUrl(master)
       .withWebsocketPingInterval(0)
+      .withRequestTimeout(clientType.requestTimeout(sparkConf))
+      .withConnectionTimeout(clientType.connectionTimeout(sparkConf))
       .withOption(oauthTokenValue) {
         (token, configBuilder) => configBuilder.withOauthToken(token)
       }.withOption(oauthTokenFile) {
@@ -111,4 +115,20 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
       }.getOrElse(configBuilder)
     }
   }
+
+  object ClientType extends Enumeration {
+    import scala.language.implicitConversions
+    val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT)
+    val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT)
+
+    protected case class Val(
+        requestTimeoutEntry: ConfigEntry[Int],
+        connectionTimeoutEntry: ConfigEntry[Int])
+      extends super.Val {
+      def requestTimeout(conf: SparkConf): Int = conf.get(requestTimeoutEntry)
+      def connectionTimeout(conf: SparkConf): Int = conf.get(connectionTimeoutEntry)
+    }
+
+    implicit def convert(value: Value): Val = value.asInstanceOf[Val]
+  }
 }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 042012e..92d5176 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -220,6 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
       master,
       Some(kubernetesConf.namespace),
       KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
+      SparkKubernetesClientFactory.ClientType.Submission,
       sparkConf,
       None,
       None)) { kubernetesClient =>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 809bdf8..31ca06b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
       apiServerUri,
       Some(sc.conf.get(KUBERNETES_NAMESPACE)),
       authConfPrefix,
+      SparkKubernetesClientFactory.ClientType.Driver,
       sc.conf,
       defaultServiceAccountToken,
       defaultServiceAccountCaCrt)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org