You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/13 05:03:33 UTC

spark git commit: [SPARK-25295][K8S] Fix executor names collision

Repository: spark
Updated Branches:
  refs/heads/master 8b702e1e0 -> 3e75a9fa2


[SPARK-25295][K8S] Fix executor names collision

## What changes were proposed in this pull request?
Fixes the collision issue with spark executor names in client mode, see SPARK-25295 for the details.
It follows the cluster name convention as app-name will be used as the prefix and if that is not defined we use "spark" as the default prefix. Eg. `spark-pi-1536781360723-exec-1` where spark-pi is the name of the app passed at the config side or transformed if it contains illegal characters.

Also fixes the issue with spark app name having spaces in cluster mode.
If you run the Spark Pi test in client mode it passes.
The tricky part is the user may set the app name:
https://github.com/apache/spark/blob/3030b82c89d3e45a2e361c469fbc667a1e43b854/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala#L30
If i do:

```
./bin/spark-submit
...
 --deploy-mode cluster --name "spark pi"
...
```
it will fail as the app name is used for the prefix of driver's pod name and it cannot have spaces (according to k8s conventions).

## How was this patch tested?

Manually by running spark job in client mode.
To reproduce do:
```
kubectl create -f service.yaml
kubectl create -f pod.yaml
```
 service.yaml :
```
kind: Service
apiVersion: v1
metadata:
  name: spark-test-app-1-svc
spec:
  clusterIP: None
  selector:
    spark-app-selector: spark-test-app-1
  ports:
  - protocol: TCP
    name: driver-port
    port: 7077
    targetPort: 7077
  - protocol: TCP
    name: block-manager
    port: 10000
    targetPort: 10000
```
pod.yaml:

```
apiVersion: v1
kind: Pod
metadata:
  name: spark-test-app-1
  labels:
    spark-app-selector: spark-test-app-1
spec:
  containers:
  - name: spark-test
    image: skonto/spark:k8s-client-fix
    imagePullPolicy: Always
    command:
      - 'sh'
      - '-c'
      -  "/opt/spark/bin/spark-submit
              --verbose
              --master k8s://https://kubernetes.default.svc
              --deploy-mode client
              --class org.apache.spark.examples.SparkPi
              --conf spark.app.name=spark
              --conf spark.executor.instances=1
              --conf spark.kubernetes.container.image=skonto/spark:k8s-client-fix
              --conf spark.kubernetes.container.image.pullPolicy=Always
              --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token
              --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
              --conf spark.executor.memory=500m
              --conf spark.executor.cores=1
              --conf spark.executor.instances=1
              --conf spark.driver.host=spark-test-app-1-svc.default.svc
              --conf spark.driver.port=7077
              --conf spark.driver.blockManager.port=10000
              local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 1000000"
```

Closes #22405 from skonto/fix-k8s-client-mode-executor-names.

Authored-by: Stavros Kontopoulos <st...@lightbend.com>
Signed-off-by: Yinan Li <yn...@google.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e75a9fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e75a9fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e75a9fa

Branch: refs/heads/master
Commit: 3e75a9fa24f8629d068b5fbbc7356ce2603fa58d
Parents: 8b702e1
Author: Stavros Kontopoulos <st...@lightbend.com>
Authored: Wed Sep 12 22:02:59 2018 -0700
Committer: Yinan Li <yn...@google.com>
Committed: Wed Sep 12 22:02:59 2018 -0700

----------------------------------------------------------------------
 .../spark/deploy/k8s/KubernetesConf.scala       | 13 +++++++++++-
 .../submit/KubernetesClientApplication.scala    | 21 ++++++++++++++++----
 .../k8s/ExecutorPodsAllocatorSuite.scala        | 16 ++++++++++++---
 3 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e75a9fa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 3aa35d4..cae6e7d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
 import org.apache.spark.internal.config.ConfigEntry
 
 
@@ -220,10 +221,20 @@ private[spark] object KubernetesConf {
     val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
       sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
 
+    // If no prefix is defined then we are in pure client mode
+    // (not the one used by cluster mode inside the container)
+    val appResourceNamePrefix = {
+      if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
+        getResourceNamePrefix(getAppName(sparkConf))
+      } else {
+        sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+      }
+    }
+
     KubernetesConf(
       sparkConf.clone(),
       KubernetesExecutorSpecificConf(executorId, driverPod),
-      sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
+      appResourceNamePrefix,
       appId,
       executorLabels,
       executorAnnotations,

http://git-wip-us.apache.org/repos/asf/spark/blob/3e75a9fa/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 986c950..edeaa38 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -211,11 +211,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
     // a unique app ID (captured by spark.app.id) in the format below.
     val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
-    val launchTime = System.currentTimeMillis()
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
-    val kubernetesResourceNamePrefix = {
-      s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
-    }
+    val kubernetesResourceNamePrefix = KubernetesClientApplication.getResourceNamePrefix(appName)
     sparkConf.set(KUBERNETES_PYSPARK_PY_FILES, clientArguments.maybePyFiles.getOrElse(""))
     val kubernetesConf = KubernetesConf.createDriverConf(
       sparkConf,
@@ -254,3 +251,19 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
     }
   }
 }
+
+private[spark] object KubernetesClientApplication {
+
+  def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark")
+
+  def getResourceNamePrefix(appName: String): String = {
+    val launchTime = System.currentTimeMillis()
+    s"$appName-$launchTime"
+      .trim
+      .toLowerCase
+      .replaceAll("\\s+", "-")
+      .replaceAll("\\.", "-")
+      .replaceAll("[^a-z0-9\\-]", "")
+      .replaceAll("-+", "-")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3e75a9fa/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index e847f85..0e617b0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
             executorSpecificConf.executorId,
             TEST_SPARK_APP_ID,
             Some(driverPod))
-          k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
+
+          // Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX
+          // has not be set for the tests and thus KubernetesConf will use a random
+          // string for the prefix, based on the app name, and this comparison here will fail.
+          val k8sConfCopy = k8sConf
+            .copy(appResourceNamePrefix = "")
+            .copy(sparkConf = conf)
+          val expectedK8sConfCopy = expectedK8sConf
+            .copy(appResourceNamePrefix = "")
+            .copy(sparkConf = conf)
+
+            k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
             // Since KubernetesConf.createExecutorConf clones the SparkConf object, force
             // deep equality comparison for the SparkConf object and use object equality
             // comparison on all other fields.
-            k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = conf)
+            k8sConfCopy == expectedK8sConfCopy
         }
       }
     })
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org