You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/06 19:15:16 UTC
[spark] branch master updated: [SPARK-27023][K8S] Make k8s client
timeouts configurable
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e9e8bb3 [SPARK-27023][K8S] Make k8s client timeouts configurable
e9e8bb3 is described below
commit e9e8bb33ef9ad785473ded168bc85867dad4ee70
Author: Onur Satici <os...@palantir.com>
AuthorDate: Wed Mar 6 11:14:39 2019 -0800
[SPARK-27023][K8S] Make k8s client timeouts configurable
## What changes were proposed in this pull request?
Make k8s client timeouts configurable. No test suite exists for the client factory class, happy to add one if needed
Closes #23928 from onursatici/os/k8s-client-timeouts.
Lead-authored-by: Onur Satici <os...@palantir.com>
Co-authored-by: Onur Satici <on...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
docs/running-on-kubernetes.md | 28 ++++++++++++++++++++++
.../scala/org/apache/spark/deploy/k8s/Config.scala | 24 +++++++++++++++++++
.../deploy/k8s/SparkKubernetesClientFactory.scala | 20 ++++++++++++++++
.../k8s/submit/KubernetesClientApplication.scala | 1 +
.../cluster/k8s/KubernetesClusterManager.scala | 1 +
5 files changed, 74 insertions(+)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e23f28c..d82dc64 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -990,6 +990,34 @@ See the [configuration page](configuration.html) for information on Spark config
Specify whether executor pods should be deleted in case of failure or normal termination.
</td>
</tr>
+<tr>
+ <td><code>spark.kubernetes.submission.connectionTimeout</code></td>
+ <td>10000</td>
+ <td>
+ Connection timeout in milliseconds for the kubernetes client to use for starting the driver.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.kubernetes.submission.requestTimeout</code></td>
+ <td>10000</td>
+ <td>
+ Request timeout in milliseconds for the kubernetes client to use for starting the driver.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.kubernetes.driver.connectionTimeout</code></td>
+ <td>10000</td>
+ <td>
+ Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.kubernetes.driver.requestTimeout</code></td>
+ <td>10000</td>
+ <td>
+ Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
+ </td>
+</tr>
</table>
#### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 4cca1e2..83b5a75 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -86,6 +86,30 @@ private[spark] object Config extends Logging {
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+ val SUBMISSION_CLIENT_REQUEST_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.submission.requestTimeout")
+ .doc("request timeout to be used in milliseconds for starting the driver")
+ .intConf
+ .createWithDefault(10000)
+
+ val SUBMISSION_CLIENT_CONNECTION_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.submission.connectionTimeout")
+ .doc("connection timeout to be used in milliseconds for starting the driver")
+ .intConf
+ .createWithDefault(10000)
+
+ val DRIVER_CLIENT_REQUEST_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.driver.requestTimeout")
+ .doc("request timeout to be used in milliseconds for driver to request executors")
+ .intConf
+ .createWithDefault(10000)
+
+ val DRIVER_CLIENT_CONNECTION_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.driver.connectionTimeout")
+ .doc("connection timeout to be used in milliseconds for driver to request executors")
+ .intConf
+ .createWithDefault(10000)
+
val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 06dea42..459259f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -28,6 +28,7 @@ import okhttp3.Dispatcher
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils
/**
@@ -41,6 +42,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
+ clientType: ClientType.Value,
sparkConf: SparkConf,
defaultServiceAccountToken: Option[File],
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
@@ -79,6 +81,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
+ .withRequestTimeout(clientType.requestTimeout(sparkConf))
+ .withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
@@ -111,4 +115,20 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}.getOrElse(configBuilder)
}
}
+
+ object ClientType extends Enumeration {
+ import scala.language.implicitConversions
+ val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT)
+ val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT)
+
+ protected case class Val(
+ requestTimeoutEntry: ConfigEntry[Int],
+ connectionTimeoutEntry: ConfigEntry[Int])
+ extends super.Val {
+ def requestTimeout(conf: SparkConf): Int = conf.get(requestTimeoutEntry)
+ def connectionTimeout(conf: SparkConf): Int = conf.get(connectionTimeoutEntry)
+ }
+
+ implicit def convert(value: Value): Val = value.asInstanceOf[Val]
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 042012e..92d5176 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -220,6 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
+ SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None,
None)) { kubernetesClient =>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 809bdf8..31ca06b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
apiServerUri,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
authConfPrefix,
+ SparkKubernetesClientFactory.ClientType.Driver,
sc.conf,
defaultServiceAccountToken,
defaultServiceAccountCaCrt)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org