You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/14 16:30:23 UTC
[spark] branch branch-2.4 updated: [SPARK-26742][K8S][BRANCH-2.4]
Update k8s client version to 4.1.2
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 2d4e9cf [SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2
2d4e9cf is described below
commit 2d4e9cf84b85a5f8278276e8d8ff59f6f4b11c4c
Author: Stavros Kontopoulos <st...@lightbend.com>
AuthorDate: Thu Mar 14 09:29:52 2019 -0700
[SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2
## What changes were proposed in this pull request?
Updates client version and fixes some related issues.
## How was this patch tested?
Tested with the latest minikube version and k8s 1.13.
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
Run completed in 4 minutes, 20 seconds.
Total number of tests run: 14
Suites: completed 2, aborted 0
Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM 2.4.2-SNAPSHOT ............ SUCCESS [ 2.980 s]
[INFO] Spark Project Tags ................................. SUCCESS [ 2.880 s]
[INFO] Spark Project Local DB ............................. SUCCESS [ 1.954 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 3.369 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 1.791 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 1.845 s]
[INFO] Spark Project Launcher ............................. SUCCESS [ 3.725 s]
[INFO] Spark Project Core ................................. SUCCESS [ 23.572 s]
[INFO] Spark Project Kubernetes Integration Tests 2.4.2-SNAPSHOT SUCCESS [04:25 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 05:08 min
[INFO] Finished at: 2019-03-06T18:03:55Z
[INFO] ------------------------------------------------------------------------
Closes #23993 from skonto/fix-k8s-version.
Authored-by: Stavros Kontopoulos <st...@lightbend.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
dev/deps/spark-deps-hadoop-2.6 | 7 ++-
dev/deps/spark-deps-hadoop-2.7 | 7 ++-
dev/deps/spark-deps-hadoop-3.1 | 7 ++-
resource-managers/kubernetes/core/pom.xml | 2 +-
.../k8s/features/MountVolumesFeatureStep.scala | 4 +-
.../k8s/submit/LoggingPodStatusWatcher.scala | 6 +-
.../cluster/k8s/ExecutorLifecycleTestUtils.scala | 2 +-
.../kubernetes/integration-tests/pom.xml | 2 +-
.../backend/minikube/Minikube.scala | 71 +++++++++++++++++++---
9 files changed, 82 insertions(+), 26 deletions(-)
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 307040e..0e34af7 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -131,13 +131,14 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
-kubernetes-client-3.0.0.jar
-kubernetes-model-2.0.0.jar
+kubernetes-client-4.1.2.jar
+kubernetes-model-4.1.2.jar
+kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
-logging-interceptor-3.8.1.jar
+logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 4a6ad3f..6b165a4 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -132,13 +132,14 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
-kubernetes-client-3.0.0.jar
-kubernetes-model-2.0.0.jar
+kubernetes-client-4.1.2.jar
+kubernetes-model-4.1.2.jar
+kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
-logging-interceptor-3.8.1.jar
+logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 83e243b..1ee3902 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -147,13 +147,14 @@ kerby-pkix-1.0.1.jar
kerby-util-1.0.1.jar
kerby-xdr-1.0.1.jar
kryo-shaded-4.0.2.jar
-kubernetes-client-3.0.0.jar
-kubernetes-model-2.0.0.jar
+kubernetes-client-4.1.2.jar
+kubernetes-model-4.1.2.jar
+kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
-logging-interceptor-3.8.1.jar
+logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 788e706..d534183 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
- <kubernetes.client.version>3.0.0</kubernetes.client.version>
+ <kubernetes.client.version>4.1.2</kubernetes.client.version>
</properties>
<dependencies>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index bb0e2b3..026b7eb 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -57,7 +57,9 @@ private[spark] class MountVolumesFeatureStep(
val volumeBuilder = spec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
new VolumeBuilder()
- .withHostPath(new HostPathVolumeSource(hostPath))
+ .withHostPath(new HostPathVolumeSourceBuilder()
+ .withPath(hostPath)
+ .build())
case KubernetesPVCVolumeConf(claimName) =>
new VolumeBuilder()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 173ac54..4a7d3d4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.JavaConverters._
-import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
@@ -174,7 +174,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
}.getOrElse(Seq(("Container state", "N/A")))
}
- private def formatTime(time: Time): String = {
- if (time != null) time.getTime else "N/A"
+ private def formatTime(time: String): String = {
+ if (time != null || time != "") time else "N/A"
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
index c6b667e..2e88362 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -82,7 +82,7 @@ object ExecutorLifecycleTestUtils {
def deletedExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewMetadata()
- .withNewDeletionTimestamp("523012521")
+ .withDeletionTimestamp("523012521")
.endMetadata()
.build()
}
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 47d15af..cd02b68 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -29,7 +29,7 @@
<download-maven-plugin.version>1.3.0</download-maven-plugin.version>
<exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
<extraScalaTestArgs></extraScalaTestArgs>
- <kubernetes-client.version>3.0.0</kubernetes-client.version>
+ <kubernetes-client.version>4.1.2</kubernetes-client.version>
<scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
<scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 6494cbc..78ef44b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
-import java.io.File
import java.nio.file.Paths
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
@@ -26,8 +25,18 @@ import org.apache.spark.internal.Logging
// TODO support windows
private[spark] object Minikube extends Logging {
-
private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
+ private val HOST_PREFIX = "host:"
+ private val KUBELET_PREFIX = "kubelet:"
+ private val APISERVER_PREFIX = "apiserver:"
+ private val KUBECTL_PREFIX = "kubectl:"
+ private val MINIKUBE_VM_PREFIX = "minikubeVM: "
+ private val MINIKUBE_PREFIX = "minikube: "
+ private val MINIKUBE_PATH = ".minikube"
+
+ def logVersion(): Unit = {
+ logInfo(executeMinikube("version").mkString("\n"))
+ }
def getMinikubeIp: String = {
val outputs = executeMinikube("ip")
@@ -38,12 +47,21 @@ private[spark] object Minikube extends Logging {
def getMinikubeStatus: MinikubeStatus.Value = {
val statusString = executeMinikube("status")
- .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
- .head
- .replaceFirst("minikubeVM: ", "")
- .replaceFirst("minikube: ", "")
- MinikubeStatus.unapply(statusString)
+ logInfo(s"Minikube status command output:\n$statusString")
+ // up to minikube version v0.30.0 use this to check for minikube status
+ val oldMinikube = statusString
+ .filter(line => line.contains(MINIKUBE_VM_PREFIX) || line.contains(MINIKUBE_PREFIX))
+
+ if (oldMinikube.isEmpty) {
+ getIfNewMinikubeStatus(statusString)
+ } else {
+ val finalStatusString = oldMinikube
+ .head
+ .replaceFirst(MINIKUBE_VM_PREFIX, "")
+ .replaceFirst(MINIKUBE_PREFIX, "")
+ MinikubeStatus.unapply(finalStatusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
+ }
}
def getKubernetesClient: DefaultKubernetesClient = {
@@ -52,13 +70,46 @@ private[spark] object Minikube extends Logging {
val kubernetesConf = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(kubernetesMaster)
- .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath)
- .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath)
- .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath)
+ .withCaCertFile(
+ Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath)
+ .withClientCertFile(
+ Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath)
+ .withClientKeyFile(
+ Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath)
.build()
new DefaultKubernetesClient(kubernetesConf)
}
+ // Covers minikube status output after Minikube V0.30.
+ private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = {
+ val hostString = statusString.find(_.contains(s"$HOST_PREFIX "))
+ val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX "))
+ val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX "))
+ val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX "))
+
+ if (hostString.isEmpty || kubeletString.isEmpty
+ || apiserverString.isEmpty || kubectlString.isEmpty) {
+ MinikubeStatus.NONE
+ } else {
+ val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "")
+ val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "")
+ val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "")
+ val status4 = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
+ if (!status4.contains("Correctly Configured:")) {
+ MinikubeStatus.NONE
+ } else {
+ val stats = List(status1, status2, status3)
+ .map(MinikubeStatus.unapply)
+ .map(_.getOrElse(throw new IllegalStateException(s"Unknown status $statusString")))
+ if (stats.exists(_ != MinikubeStatus.RUNNING)) {
+ MinikubeStatus.NONE
+ } else {
+ MinikubeStatus.RUNNING
+ }
+ }
+ }
+ }
+
private def executeMinikube(action: String, args: String*): Seq[String] = {
ProcessUtils.executeProcess(
Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org