You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/14 16:30:23 UTC

[spark] branch branch-2.4 updated: [SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2d4e9cf  [SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2
2d4e9cf is described below

commit 2d4e9cf84b85a5f8278276e8d8ff59f6f4b11c4c
Author: Stavros Kontopoulos <st...@lightbend.com>
AuthorDate: Thu Mar 14 09:29:52 2019 -0700

    [SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2
    
    ## What changes were proposed in this pull request?
    
    Updates client version and fixes some related issues.
    
    ## How was this patch tested?
    
    Tested with the latest minikube version and k8s 1.13.
    KubernetesSuite:
    - Run SparkPi with no resources
    - Run SparkPi with a very long application name.
    - Use SparkLauncher.NO_RESOURCE
    - Run SparkPi with a master URL without a scheme.
    - Run SparkPi with an argument.
    - Run SparkPi with custom labels, annotations, and environment variables.
    - Run extraJVMOptions check on driver
    - Run SparkRemoteFileTest using a remote data file
    - Run SparkPi with env and mount secrets.
    - Run PySpark on simple pi.py example
    - Run PySpark with Python2 to test a pyfiles example
    - Run PySpark with Python3 to test a pyfiles example
    - Run PySpark with memory customization
    - Run in client mode.
    Run completed in 4 minutes, 20 seconds.
    Total number of tests run: 14
    Suites: completed 2, aborted 0
    Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO]
    [INFO] Spark Project Parent POM 2.4.2-SNAPSHOT ............ SUCCESS [  2.980 s]
    [INFO] Spark Project Tags ................................. SUCCESS [  2.880 s]
    [INFO] Spark Project Local DB ............................. SUCCESS [  1.954 s]
    [INFO] Spark Project Networking ........................... SUCCESS [  3.369 s]
    [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  1.791 s]
    [INFO] Spark Project Unsafe ............................... SUCCESS [  1.845 s]
    [INFO] Spark Project Launcher ............................. SUCCESS [  3.725 s]
    [INFO] Spark Project Core ................................. SUCCESS [ 23.572 s]
    [INFO] Spark Project Kubernetes Integration Tests 2.4.2-SNAPSHOT SUCCESS [04:25 min]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 05:08 min
    [INFO] Finished at: 2019-03-06T18:03:55Z
    [INFO] ------------------------------------------------------------------------
    
    Closes #23993 from skonto/fix-k8s-version.
    
    Authored-by: Stavros Kontopoulos <st...@lightbend.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 dev/deps/spark-deps-hadoop-2.6                     |  7 ++-
 dev/deps/spark-deps-hadoop-2.7                     |  7 ++-
 dev/deps/spark-deps-hadoop-3.1                     |  7 ++-
 resource-managers/kubernetes/core/pom.xml          |  2 +-
 .../k8s/features/MountVolumesFeatureStep.scala     |  4 +-
 .../k8s/submit/LoggingPodStatusWatcher.scala       |  6 +-
 .../cluster/k8s/ExecutorLifecycleTestUtils.scala   |  2 +-
 .../kubernetes/integration-tests/pom.xml           |  2 +-
 .../backend/minikube/Minikube.scala                | 71 +++++++++++++++++++---
 9 files changed, 82 insertions(+), 26 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 307040e..0e34af7 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -131,13 +131,14 @@ jta-1.1.jar
 jtransforms-2.4.0.jar
 jul-to-slf4j-1.7.16.jar
 kryo-shaded-4.0.2.jar
-kubernetes-client-3.0.0.jar
-kubernetes-model-2.0.0.jar
+kubernetes-client-4.1.2.jar
+kubernetes-model-4.1.2.jar
+kubernetes-model-common-4.1.2.jar
 leveldbjni-all-1.8.jar
 libfb303-0.9.3.jar
 libthrift-0.9.3.jar
 log4j-1.2.17.jar
-logging-interceptor-3.8.1.jar
+logging-interceptor-3.12.0.jar
 lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 4a6ad3f..6b165a4 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -132,13 +132,14 @@ jta-1.1.jar
 jtransforms-2.4.0.jar
 jul-to-slf4j-1.7.16.jar
 kryo-shaded-4.0.2.jar
-kubernetes-client-3.0.0.jar
-kubernetes-model-2.0.0.jar
+kubernetes-client-4.1.2.jar
+kubernetes-model-4.1.2.jar
+kubernetes-model-common-4.1.2.jar
 leveldbjni-all-1.8.jar
 libfb303-0.9.3.jar
 libthrift-0.9.3.jar
 log4j-1.2.17.jar
-logging-interceptor-3.8.1.jar
+logging-interceptor-3.12.0.jar
 lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 83e243b..1ee3902 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -147,13 +147,14 @@ kerby-pkix-1.0.1.jar
 kerby-util-1.0.1.jar
 kerby-xdr-1.0.1.jar
 kryo-shaded-4.0.2.jar
-kubernetes-client-3.0.0.jar
-kubernetes-model-2.0.0.jar
+kubernetes-client-4.1.2.jar
+kubernetes-model-4.1.2.jar
+kubernetes-model-common-4.1.2.jar
 leveldbjni-all-1.8.jar
 libfb303-0.9.3.jar
 libthrift-0.9.3.jar
 log4j-1.2.17.jar
-logging-interceptor-3.8.1.jar
+logging-interceptor-3.12.0.jar
 lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 788e706..d534183 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -29,7 +29,7 @@
   <name>Spark Project Kubernetes</name>
   <properties>
     <sbt.project.name>kubernetes</sbt.project.name>
-    <kubernetes.client.version>3.0.0</kubernetes.client.version>
+    <kubernetes.client.version>4.1.2</kubernetes.client.version>
   </properties>
 
   <dependencies>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
index bb0e2b3..026b7eb 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala
@@ -57,7 +57,9 @@ private[spark] class MountVolumesFeatureStep(
       val volumeBuilder = spec.volumeConf match {
         case KubernetesHostPathVolumeConf(hostPath) =>
           new VolumeBuilder()
-            .withHostPath(new HostPathVolumeSource(hostPath))
+            .withHostPath(new HostPathVolumeSourceBuilder()
+              .withPath(hostPath)
+              .build())
 
         case KubernetesPVCVolumeConf(claimName) =>
           new VolumeBuilder()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index 173ac54..4a7d3d4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.JavaConverters._
 
-import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
 import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
 import io.fabric8.kubernetes.client.Watcher.Action
 
@@ -174,7 +174,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
         }.getOrElse(Seq(("Container state", "N/A")))
   }
 
-  private def formatTime(time: Time): String = {
-    if (time != null) time.getTime else "N/A"
+  private def formatTime(time: String): String = {
+    if (time != null ||  time != "") time else "N/A"
   }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
index c6b667e..2e88362 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala
@@ -82,7 +82,7 @@ object ExecutorLifecycleTestUtils {
   def deletedExecutor(executorId: Long): Pod = {
     new PodBuilder(podWithAttachedContainerForId(executorId))
       .editOrNewMetadata()
-        .withNewDeletionTimestamp("523012521")
+        .withDeletionTimestamp("523012521")
         .endMetadata()
       .build()
   }
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 47d15af..cd02b68 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -29,7 +29,7 @@
     <download-maven-plugin.version>1.3.0</download-maven-plugin.version>
     <exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
     <extraScalaTestArgs></extraScalaTestArgs>
-    <kubernetes-client.version>3.0.0</kubernetes-client.version>
+    <kubernetes-client.version>4.1.2</kubernetes-client.version>
     <scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
     <scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
     <sbt.project.name>kubernetes-integration-tests</sbt.project.name>
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 6494cbc..78ef44b 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
 
-import java.io.File
 import java.nio.file.Paths
 
 import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
@@ -26,8 +25,18 @@ import org.apache.spark.internal.Logging
 
 // TODO support windows
 private[spark] object Minikube extends Logging {
-
   private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
+  private val HOST_PREFIX = "host:"
+  private val KUBELET_PREFIX = "kubelet:"
+  private val APISERVER_PREFIX = "apiserver:"
+  private val KUBECTL_PREFIX = "kubectl:"
+  private val MINIKUBE_VM_PREFIX = "minikubeVM: "
+  private val MINIKUBE_PREFIX = "minikube: "
+  private val MINIKUBE_PATH = ".minikube"
+
+  def logVersion(): Unit = {
+    logInfo(executeMinikube("version").mkString("\n"))
+  }
 
   def getMinikubeIp: String = {
     val outputs = executeMinikube("ip")
@@ -38,12 +47,21 @@ private[spark] object Minikube extends Logging {
 
   def getMinikubeStatus: MinikubeStatus.Value = {
     val statusString = executeMinikube("status")
-      .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
-      .head
-      .replaceFirst("minikubeVM: ", "")
-      .replaceFirst("minikube: ", "")
-    MinikubeStatus.unapply(statusString)
+    logInfo(s"Minikube status command output:\n$statusString")
+    // up to minikube version v0.30.0 use this to check for minikube status
+    val oldMinikube = statusString
+      .filter(line => line.contains(MINIKUBE_VM_PREFIX) || line.contains(MINIKUBE_PREFIX))
+
+    if (oldMinikube.isEmpty) {
+      getIfNewMinikubeStatus(statusString)
+    } else {
+      val finalStatusString = oldMinikube
+        .head
+        .replaceFirst(MINIKUBE_VM_PREFIX, "")
+        .replaceFirst(MINIKUBE_PREFIX, "")
+      MinikubeStatus.unapply(finalStatusString)
         .getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
+    }
   }
 
   def getKubernetesClient: DefaultKubernetesClient = {
@@ -52,13 +70,46 @@ private[spark] object Minikube extends Logging {
     val kubernetesConf = new ConfigBuilder()
       .withApiVersion("v1")
       .withMasterUrl(kubernetesMaster)
-      .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath)
-      .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath)
-      .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath)
+      .withCaCertFile(
+        Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath)
+      .withClientCertFile(
+        Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath)
+      .withClientKeyFile(
+        Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath)
       .build()
     new DefaultKubernetesClient(kubernetesConf)
   }
 
+  // Covers minikube status output after Minikube V0.30.
+  private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = {
+    val hostString = statusString.find(_.contains(s"$HOST_PREFIX "))
+    val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX "))
+    val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX "))
+    val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX "))
+
+    if (hostString.isEmpty || kubeletString.isEmpty
+      || apiserverString.isEmpty || kubectlString.isEmpty) {
+      MinikubeStatus.NONE
+    } else {
+      val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "")
+      val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "")
+      val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "")
+      val status4 = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
+      if (!status4.contains("Correctly Configured:")) {
+        MinikubeStatus.NONE
+      } else {
+        val stats = List(status1, status2, status3)
+          .map(MinikubeStatus.unapply)
+          .map(_.getOrElse(throw new IllegalStateException(s"Unknown status $statusString")))
+        if (stats.exists(_ != MinikubeStatus.RUNNING)) {
+          MinikubeStatus.NONE
+        } else {
+          MinikubeStatus.RUNNING
+        }
+      }
+    }
+  }
+
   private def executeMinikube(action: String, args: String*): Seq[String] = {
     ProcessUtils.executeProcess(
       Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org