You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/30 18:40:23 UTC

[spark] branch master updated: [SPARK-35280][K8S] Promote KubernetesUtils to DeveloperApi

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e8701a  [SPARK-35280][K8S] Promote KubernetesUtils to DeveloperApi
4e8701a is described below

commit 4e8701a77dff729c4e8e0ad39c16e2717c2c32fe
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Fri Apr 30 11:39:18 2021 -0700

    [SPARK-35280][K8S] Promote KubernetesUtils to DeveloperApi
    
    ### What changes were proposed in this pull request?
    
    Since SPARK-22757, `KubernetesUtils` has been used as an important utility class by all K8s modules and `ExternalClusterManager`s. This PR aims to promote `KubernetesUtils` to `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.2.0.
    
    ### Why are the changes needed?
    
    Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. To have `ExternalClusterManager` for K8s environment, `KubernetesUtils` class is crucial and needs to be stable. By promoting to a subset of K8s developer API, we can maintain these more sustainable way and give a better and stable functionality to K8s users.
    
    In this PR, `Since` annotations denote the last function signature changes because these are going to become public at Apache Spark 3.2.0.
    
    | Version | Function Name |
    |-|-|
    | 2.3.0 | parsePrefixedKeyValuePairs |
    | 2.3.0 | requireNandDefined |
    | 2.3.0 | parsePrefixedKeyValuePairs |
    | 2.4.0 | parseMasterUrl |
    | 3.0.0 | requireBothOrNeitherDefined |
    | 3.0.0 | requireSecondIfFirstIsDefined |
    | 3.0.0 | selectSparkContainer |
    | 3.0.0 | formatPairsBundle |
    | 3.0.0 | formatPodState |
    | 3.0.0 | containersDescription |
    | 3.0.0 | containerStatusDescription |
    | 3.0.0 | formatTime |
    | 3.0.0 | uniqueID |
    | 3.0.0 | buildResourcesQuantities |
    | 3.0.0 | uploadAndTransformFileUris |
    | 3.0.0 | uploadFileUri |
    | 3.0.0 | requireBothOrNeitherDefined |
    | 3.0.0 | buildPodWithServiceAccount |
    | 3.0.0 | isLocalAndResolvable |
    | 3.1.1 | renameMainAppResource |
    | 3.1.1 | addOwnerReference |
    | 3.2.0 | loadPodFromTemplate |
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this is new API additions.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #32406 from dongjoon-hyun/SPARK-35280.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  | 30 +++++++++++++++++++++-
 1 file changed, 29 insertions(+), 1 deletion(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 6bc31c2..0c8d964 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -29,6 +29,7 @@ import org.apache.commons.codec.binary.Hex
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
 import org.apache.spark.internal.Logging
@@ -38,7 +39,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
 import org.apache.spark.util.DependencyUtils.downloadFile
 import org.apache.spark.util.Utils.getHadoopFileSystem
 
-private[spark] object KubernetesUtils extends Logging {
+/**
+ * :: DeveloperApi ::
+ *
+ * A utility class used for K8s operations internally and for implementing ExternalClusterManagers.
+ */
+@Unstable
+@DeveloperApi
+object KubernetesUtils extends Logging {
 
   private val systemClock = new SystemClock()
   private lazy val RNG = new SecureRandom()
@@ -51,12 +59,14 @@ private[spark] object KubernetesUtils extends Logging {
    * @param prefix the given property name prefix
    * @return a Map storing the configuration property keys and values
    */
+  @Since("2.3.0")
   def parsePrefixedKeyValuePairs(
       sparkConf: SparkConf,
       prefix: String): Map[String, String] = {
     sparkConf.getAllWithPrefix(prefix).toMap
   }
 
+  @Since("3.0.0")
   def requireBothOrNeitherDefined(
       opt1: Option[_],
       opt2: Option[_],
@@ -66,6 +76,7 @@ private[spark] object KubernetesUtils extends Logging {
     requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
   }
 
+  @Since("3.0.0")
   def requireSecondIfFirstIsDefined(
       opt1: Option[_],
       opt2: Option[_],
@@ -75,11 +86,13 @@ private[spark] object KubernetesUtils extends Logging {
     }
   }
 
+  @Since("2.3.0")
   def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
     opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
     opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
   }
 
+  @Since("3.2.0")
   def loadPodFromTemplate(
       kubernetesClient: KubernetesClient,
       templateFileName: String,
@@ -99,6 +112,7 @@ private[spark] object KubernetesUtils extends Logging {
     }
   }
 
+  @Since("3.0.0")
   def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
     def selectNamedContainer(
       containers: List[Container], name: String): Option[(Container, List[Container])] =
@@ -125,8 +139,10 @@ private[spark] object KubernetesUtils extends Logging {
       }.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
   }
 
+  @Since("2.4.0")
   def parseMasterUrl(url: String): String = url.substring("k8s://".length)
 
+  @Since("3.0.0")
   def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
     // Use more loggable format if value is null or empty
     val indentStr = "\t" * indent
@@ -141,6 +157,7 @@ private[spark] object KubernetesUtils extends Logging {
    * @param pod Pod
    * @return Human readable pod state
    */
+  @Since("3.0.0")
   def formatPodState(pod: Pod): String = {
     val details = Seq[(String, String)](
       // pod metadata
@@ -164,6 +181,7 @@ private[spark] object KubernetesUtils extends Logging {
     formatPairsBundle(details)
   }
 
+  @Since("3.0.0")
   def containersDescription(p: Pod, indent: Int = 1): String = {
     p.getStatus.getContainerStatuses.asScala.map { status =>
       Seq(
@@ -173,6 +191,7 @@ private[spark] object KubernetesUtils extends Logging {
     }.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
   }
 
+  @Since("3.0.0")
   def containerStatusDescription(containerStatus: ContainerStatus)
     : Seq[(String, String)] = {
     val state = containerStatus.getState
@@ -200,6 +219,7 @@ private[spark] object KubernetesUtils extends Logging {
       }.getOrElse(Seq(("container state", "N/A")))
   }
 
+  @Since("3.0.0")
   def formatTime(time: String): String = {
     if (time != null) time else "N/A"
   }
@@ -212,6 +232,7 @@ private[spark] object KubernetesUtils extends Logging {
    * This avoids using a UUID for uniqueness (too long), and relying solely on the current time
    * (not unique enough).
    */
+  @Since("3.0.0")
   def uniqueID(clock: Clock = systemClock): String = {
     val random = new Array[Byte](3)
     synchronized {
@@ -228,6 +249,7 @@ private[spark] object KubernetesUtils extends Logging {
    * It assumes we can use the Kubernetes device plugin format: vendor-domain/resource.
    * It returns a set with a tuple of vendor-domain/resource and Quantity for each resource.
    */
+  @Since("3.0.0")
   def buildResourcesQuantities(
       componentName: String,
       sparkConf: SparkConf): Map[String, Quantity] = {
@@ -247,6 +269,7 @@ private[spark] object KubernetesUtils extends Logging {
   /**
    * Upload files and modify their uris
    */
+  @Since("3.0.0")
   def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
     : Iterable[String] = {
     fileUris.map { uri =>
@@ -261,11 +284,13 @@ private[spark] object KubernetesUtils extends Logging {
     }
   }
 
+  @Since("3.0.0")
   def isLocalAndResolvable(resource: String): Boolean = {
     resource != SparkLauncher.NO_RESOURCE &&
       isLocalDependency(Utils.resolveURI(resource))
   }
 
+  @Since("3.1.1")
   def renameMainAppResource(
       resource: String,
       conf: Option[SparkConf] = None,
@@ -281,6 +306,7 @@ private[spark] object KubernetesUtils extends Logging {
     }
   }
 
+  @Since("3.0.0")
   def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
     conf match {
       case Some(sConf) =>
@@ -325,6 +351,7 @@ private[spark] object KubernetesUtils extends Logging {
     }
   }
 
+  @Since("3.0.0")
   def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
     serviceAccount.map { account =>
       new PodBuilder(pod.pod)
@@ -338,6 +365,7 @@ private[spark] object KubernetesUtils extends Logging {
 
   // Add a OwnerReference to the given resources making the pod an owner of them so when
   // the pod is deleted, the resources are garbage collected.
+  @Since("3.1.1")
   def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = {
     if (pod != null) {
       val reference = new OwnerReferenceBuilder()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org