You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/15 00:03:58 UTC

[spark] branch branch-3.1 updated: [SPARK-33748][K8S] Respect environment variables and configurations for Python executables

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 010aeca  [SPARK-33748][K8S] Respect environment variables and configurations for Python executables
010aeca is described below

commit 010aecaaf4ba84e67e4a4caffacdd18f935b5f33
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Tue Dec 15 08:56:45 2020 +0900

    [SPARK-33748][K8S] Respect environment variables and configurations for Python executables
    
    ### What changes were proposed in this pull request?
    
    This PR proposes:
    
    - Respect `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` environment variables, or `spark.pyspark.python` and `spark.pyspark.driver.python` configurations in Kubernates just like other cluster types in Spark.
    
    - Depreate `spark.kubernetes.pyspark.pythonVersion` and guide users to set the environment variables and configurations for Python executables.
        NOTE that `spark.kubernetes.pyspark.pythonVersion` is already a no-op configuration without this PR. Default is `3` and other values are disallowed.
    
    - In order for Python executable settings to be consistently used, fix `spark.archives` option to unpack into the current working directory in the driver of Kubernates' cluster mode. This behaviour is identical with Yarn's cluster mode. By doing this, users can leverage Conda or virtuenenv in cluster mode as below:
    
       ```python
        conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
        conda activate pyspark_conda_env
        conda pack -f -o pyspark_conda_env.tar.gz
        PYSPARK_PYTHON=./environment/bin/python spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
       ```
    
    - Removed several unused or useless codes such as `extractS3Key` and `renameResourcesToLocalFS`
    
    ### Why are the changes needed?
    
    - To provide a consistent support of PySpark by using `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` environment variables, or `spark.pyspark.python` and `spark.pyspark.driver.python` configurations.
    - To provide Conda and virtualenv support via `spark.archives` options.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes:
    
    - `spark.kubernetes.pyspark.pythonVersion` is deprecated.
    - `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` environment variables, and `spark.pyspark.python` and `spark.pyspark.driver.python` configurations are respected.
    
    ### How was this patch tested?
    
    Manually tested via:
    
    ```bash
    minikube delete
    minikube start --cpus 12 --memory 16384
    kubectl create namespace spark-integration-test
    cat <<EOF | kubectl apply -f -
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: spark
      namespace: spark-integration-test
    EOF
    kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark-integration-test:spark --namespace=spark-integration-test
    dev/make-distribution.sh --pip --tgz -Pkubernetes
    resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --spark-tgz `pwd`/spark-3.2.0-SNAPSHOT-bin-3.2.0.tgz  --service-account spark --namespace spark-integration-test
    ```
    
    Unittests were also added.
    
    Closes #30735 from HyukjinKwon/SPARK-33748.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit a99a47ca1df689377dbfbf4dd7258f59aee2be44)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 54 +++++++-------
 docs/running-on-kubernetes.md                      |  5 +-
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 16 +++-
 .../org/apache/spark/deploy/k8s/Constants.scala    |  3 +-
 .../k8s/features/DriverCommandFeatureStep.scala    | 37 ++++++++--
 .../features/DriverCommandFeatureStepSuite.scala   | 57 +++++++++++++--
 .../src/main/dockerfiles/spark/entrypoint.sh       | 10 +--
 .../k8s/integrationtest/DepsTestsSuite.scala       | 85 ++++++++++++++++------
 .../k8s/integrationtest/KubernetesSuite.scala      |  6 +-
 .../integrationtest/KubernetesTestComponents.scala |  5 +-
 .../deploy/k8s/integrationtest/ProcessUtils.scala  |  5 +-
 .../spark/deploy/k8s/integrationtest/Utils.scala   |  9 ++-
 .../integration-tests/tests/py_container_checks.py |  2 +-
 ...tainer_checks.py => python_executable_check.py} | 32 +++++---
 14 files changed, 236 insertions(+), 90 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index ea293f0..bb3a20d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -31,7 +31,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Properties, Try}
 
-import org.apache.commons.io.FilenameUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -387,20 +386,40 @@ private[spark] class SparkSubmit extends Logging {
         // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
         // Executors will get the jars from the Spark file server.
         // Explicitly download the related files here
-        args.jars = renameResourcesToLocalFS(args.jars, localJars)
+        args.jars = localJars
         val filesLocalFiles = Option(args.files).map {
           downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
         }.orNull
-        val archiveLocalFiles = Option(args.archives).map { uri =>
-          val resolvedUri = Utils.resolveURI(uri)
-          val downloadedUri = downloadFileList(
-            UriBuilder.fromUri(resolvedUri).fragment(null).build().toString,
+        val archiveLocalFiles = Option(args.archives).map { uris =>
+          val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
+          val localArchives = downloadFileList(
+            resolvedUris.map(
+              UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
             targetDir, sparkConf, hadoopConf, secMgr)
-          UriBuilder.fromUri(downloadedUri).fragment(resolvedUri.getFragment).build().toString
+
+          // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
+          // in cluster mode, the archives should be available in the driver's current working
+          // directory too.
+          Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
+            case (localArchive, resolvedUri) =>
+              val source = new File(localArchive.getPath)
+              val dest = new File(
+                ".",
+                if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
+              logInfo(
+                s"Unpacking an archive $resolvedUri " +
+                  s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
+              Utils.deleteRecursively(dest)
+              Utils.unpack(source, dest)
+
+              // Keep the URIs of local files with the given fragments.
+              UriBuilder.fromUri(
+                localArchive).fragment(resolvedUri.getFragment).build().toString
+          }.mkString(",")
         }.orNull
-        args.files = renameResourcesToLocalFS(args.files, filesLocalFiles)
-        args.archives = renameResourcesToLocalFS(args.archives, archiveLocalFiles)
-        args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
+        args.files = filesLocalFiles
+        args.archives = archiveLocalFiles
+        args.pyFiles = localPyFiles
       }
     }
 
@@ -836,21 +855,6 @@ private[spark] class SparkSubmit extends Logging {
     (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
   }
 
-  private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
-    if (resources != null && localResources != null) {
-      val localResourcesSeq = Utils.stringToSeq(localResources)
-      Utils.stringToSeq(resources).map { resource =>
-        val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
-        localResourcesSeq.find { localUri =>
-          val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
-          filenameRemote == filenameLocal
-        }.getOrElse(resource)
-      }.mkString(",")
-    } else {
-      resources
-    }
-  }
-
   // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
   // renewer set to the YARN ResourceManager.  Since YARN isn't configured in Mesos or Kubernetes
   // mode, we must trick it into thinking we're YARN.
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e735c74..93c6f94 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1087,7 +1087,10 @@ See the [configuration page](configuration.html) for information on Spark config
   <td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
   <td><code>"3"</code></td>
   <td>
-   This sets the major Python version of the docker image used to run the driver and executor containers. Can be 3.
+   This sets the major Python version of the docker image used to run the driver and executor containers.
+   It can be only "3". This configuration was deprecated from Spark 3.1.0, and is effectively no-op.
+   Users should set 'spark.pyspark.python' and 'spark.pyspark.driver.python' configurations or
+   'PYSPARK_PYTHON' and 'PYSPARK_DRIVER_PYTHON' environment variables.
   </td>
   <td>2.4.0</td>
 </tr>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 41194f3..8232ed3 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
 import org.apache.spark.internal.config.ConfigBuilder
 
 private[spark] object Config extends Logging {
@@ -293,12 +294,19 @@ private[spark] object Config extends Logging {
 
   val PYSPARK_MAJOR_PYTHON_VERSION =
     ConfigBuilder("spark.kubernetes.pyspark.pythonVersion")
-      .doc("This sets the major Python version. Only 3 is available for Python3.")
+      .doc(
+        s"(Deprecated since Spark 3.1, please set '${PYSPARK_PYTHON.key}' and " +
+        s"'${PYSPARK_DRIVER_PYTHON.key}' configurations or $ENV_PYSPARK_PYTHON and " +
+        s"$ENV_PYSPARK_DRIVER_PYTHON environment variables instead.)")
       .version("2.4.0")
       .stringConf
-      .checkValue(pv => List("3").contains(pv),
-        "Ensure that major Python version is Python3")
-      .createWithDefault("3")
+      .checkValue("3" == _,
+        "Python 2 was dropped from Spark 3.1, and only 3 is allowed in " +
+          "this configuration. Note that this configuration was deprecated in Spark 3.1. " +
+          s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
+          s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
+          "variables instead.")
+      .createOptional
 
   val KUBERNETES_KERBEROS_KRB5_FILE =
     ConfigBuilder("spark.kubernetes.kerberos.krb5.path")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 4014a96..543ca12 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -74,7 +74,8 @@ private[spark] object Constants {
   val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
 
   // BINDINGS
-  val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
+  val ENV_PYSPARK_PYTHON = "PYSPARK_PYTHON"
+  val ENV_PYSPARK_DRIVER_PYTHON = "PYSPARK_DRIVER_PYTHON"
 
   // Pod spec templates
   val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
index d49381b..8015a1a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala
@@ -24,6 +24,8 @@ import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
 import org.apache.spark.launcher.SparkLauncher
 
 /**
@@ -31,7 +33,7 @@ import org.apache.spark.launcher.SparkLauncher
  * executors can also find the app code.
  */
 private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
-  extends KubernetesFeatureConfigStep {
+  extends KubernetesFeatureConfigStep with Logging {
 
   override def configurePod(pod: SparkPod): SparkPod = {
     conf.mainAppResource match {
@@ -70,12 +72,37 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
     SparkPod(pod.pod, driverContainer)
   }
 
+  // Exposed for testing purpose.
+  private[spark] def environmentVariables: Map[String, String] = sys.env
+
   private def configureForPython(pod: SparkPod, res: String): SparkPod = {
+    if (conf.get(PYSPARK_MAJOR_PYTHON_VERSION).isDefined) {
+      logWarning(
+          s"${PYSPARK_MAJOR_PYTHON_VERSION.key} was deprecated in Spark 3.1. " +
+          s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
+          s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
+          "variables instead.")
+    }
+
     val pythonEnvs =
-      Seq(new EnvVarBuilder()
-          .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
-          .withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
-        .build())
+      Seq(
+        conf.get(PYSPARK_PYTHON)
+          .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
+          new EnvVarBuilder()
+            .withName(ENV_PYSPARK_PYTHON)
+            .withValue(value)
+            .build()
+        },
+        conf.get(PYSPARK_DRIVER_PYTHON)
+          .orElse(conf.get(PYSPARK_PYTHON))
+          .orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
+          .orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
+          new EnvVarBuilder()
+            .withName(ENV_PYSPARK_DRIVER_PYTHON)
+            .withValue(value)
+            .build()
+        }
+      ).flatten
 
     // re-write primary resource to be the remote one and upload the related file
     val newResName = KubernetesUtils
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
index a44d465..ebbb42f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
+import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
 
 class DriverCommandFeatureStepSuite extends SparkFunSuite {
 
@@ -50,12 +51,51 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
       "--properties-file", SPARK_CONF_PATH,
       "--class", KubernetesTestConf.MAIN_CLASS,
       mainResource, "5", "7", "9"))
+  }
+
+  test("python executable precedence") {
+    val mainResource = "local:/main.py"
 
-    val envs = spec.pod.container.getEnv.asScala
-      .map { env => (env.getName, env.getValue) }
-      .toMap
-    val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "3")
-    assert(envs === expected)
+    val pythonExecutables = Seq(
+      (Some("conf_py"), Some("conf_driver_py"), Some("env_py"), Some("env_driver_py")),
+      (Some("conf_py"), None, Some("env_py"), Some("env_driver_py")),
+      (None, None, Some("env_py"), Some("env_driver_py")),
+      (None, None, Some("env_py"), None)
+    )
+
+    val expectedResults = Seq(
+      ("conf_py", "conf_driver_py"),
+      ("conf_py", "conf_py"),
+      ("env_py", "env_driver_py"),
+      ("env_py", "env_py")
+    )
+
+    pythonExecutables.zip(expectedResults).foreach { case (pythonExecutable, expected) =>
+      val sparkConf = new SparkConf(false)
+      val (confPy, confDriverPy, envPy, envDriverPy) = pythonExecutable
+      confPy.foreach(sparkConf.set(PYSPARK_PYTHON, _))
+      confDriverPy.foreach(sparkConf.set(PYSPARK_DRIVER_PYTHON, _))
+      val pythonEnvs = Map(
+        (
+          envPy.map(v => ENV_PYSPARK_PYTHON -> v :: Nil) ++
+          envDriverPy.map(v => ENV_PYSPARK_DRIVER_PYTHON -> v :: Nil)
+        ).flatten.toArray: _*)
+
+      val spec = applyFeatureStep(
+        PythonMainAppResource(mainResource),
+        conf = sparkConf,
+        appArgs = Array("foo"),
+        env = pythonEnvs)
+
+      val envs = spec.pod.container.getEnv.asScala
+        .map { env => (env.getName, env.getValue) }
+        .toMap
+
+      val (expectedEnvPy, expectedDriverPy) = expected
+      assert(envs === Map(
+        ENV_PYSPARK_PYTHON -> expectedEnvPy,
+        ENV_PYSPARK_DRIVER_PYTHON -> expectedDriverPy))
+    }
   }
 
   test("R resource") {
@@ -123,13 +163,16 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
       resource: MainAppResource,
       conf: SparkConf = new SparkConf(false),
       appArgs: Array[String] = Array(),
-      proxyUser: Option[String] = None): KubernetesDriverSpec = {
+      proxyUser: Option[String] = None,
+      env: Map[String, String] = Map.empty[String, String]): KubernetesDriverSpec = {
     val kubernetesConf = KubernetesTestConf.createDriverConf(
       sparkConf = conf,
       mainAppResource = resource,
       appArgs = appArgs,
       proxyUser = proxyUser)
-    val step = new DriverCommandFeatureStep(kubernetesConf)
+    val step = new DriverCommandFeatureStep(kubernetesConf) {
+      private[spark] override val environmentVariables: Map[String, String] = env
+    }
     val pod = step.configurePod(SparkPod.initialPod())
     val props = step.getAdditionalPodSystemProperties()
     KubernetesDriverSpec(pod, Nil, props)
diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index c837e00..f722471 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -44,11 +44,11 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
   SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
 fi
 
-if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
-    pyv3="$(python3 -V 2>&1)"
-    export PYTHON_VERSION="${pyv3:7}"
-    export PYSPARK_PYTHON="python3"
-    export PYSPARK_DRIVER_PYTHON="python3"
+if ! [ -z ${PYSPARK_PYTHON+x} ]; then
+    export PYSPARK_PYTHON
+fi
+if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
+    export PYSPARK_DRIVER_PYTHON
 fi
 
 # If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index a15f7ff..0d15e03 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.k8s.integrationtest.DepsTestsSuite.{DEPS_TIMEOUT,
 import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{INTERVAL, MinikubeTag, TIMEOUT}
 import org.apache.spark.deploy.k8s.integrationtest.Utils.getExamplesJarName
 import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.Minikube
+import org.apache.spark.internal.config.{ARCHIVES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
 
 private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
   import KubernetesSuite.k8sTestTag
@@ -135,7 +136,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
       .create(minioStatefulSet))
   }
 
- private def deleteMinioStorage(): Unit = {
+  private def deleteMinioStorage(): Unit = {
     kubernetesTestComponents
       .kubernetesClient
       .apps()
@@ -167,7 +168,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
     tryDepsTest {
       val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
       Utils.createTarGzFile(s"$HOST_PATH/$fileName", s"$HOST_PATH/$fileName.tar.gz")
-      sparkAppConf.set("spark.archives", s"$HOST_PATH/$fileName.tar.gz#test_tar_gz")
+      sparkAppConf.set(ARCHIVES.key, s"$HOST_PATH/$fileName.tar.gz#test_tar_gz")
       val examplesJar = Utils.getTestFileAbsolutePath(getExamplesJarName(), sparkHomeDir)
       runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
         appArgs = Array(s"test_tar_gz/$fileName"),
@@ -175,40 +176,81 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
     }
   }
 
+  test(
+    "SPARK-33748: Launcher python client respecting PYSPARK_PYTHON", k8sTestTag, MinikubeTag) {
+    val fileName = Utils.createTempFile(
+      """
+        |#!/usr/bin/env bash
+        |export IS_CUSTOM_PYTHON=1
+        |python3 "$@"
+      """.stripMargin, HOST_PATH)
+    Utils.createTarGzFile(s"$HOST_PATH/$fileName", s"$HOST_PATH/$fileName.tgz")
+    sparkAppConf.set(ARCHIVES.key, s"$HOST_PATH/$fileName.tgz#test_env")
+    val pySparkFiles = Utils.getTestFileAbsolutePath("python_executable_check.py", sparkHomeDir)
+    testPython(pySparkFiles,
+      Seq(
+        s"PYSPARK_PYTHON: ./test_env/$fileName",
+        s"PYSPARK_DRIVER_PYTHON: ./test_env/$fileName",
+        "Custom Python used on executor: True",
+        "Custom Python used on driver: True"),
+      env = Map("PYSPARK_PYTHON" -> s"./test_env/$fileName"))
+  }
+
+  test(
+    "SPARK-33748: Launcher python client respecting " +
+      s"${PYSPARK_PYTHON.key} and ${PYSPARK_DRIVER_PYTHON.key}", k8sTestTag, MinikubeTag) {
+    val fileName = Utils.createTempFile(
+      """
+        |#!/usr/bin/env bash
+        |export IS_CUSTOM_PYTHON=1
+        |python3 "$@"
+      """.stripMargin, HOST_PATH)
+    Utils.createTarGzFile(s"$HOST_PATH/$fileName", s"$HOST_PATH/$fileName.tgz")
+    sparkAppConf.set(ARCHIVES.key, s"$HOST_PATH/$fileName.tgz#test_env")
+    sparkAppConf.set(PYSPARK_PYTHON.key, s"./test_env/$fileName")
+    sparkAppConf.set(PYSPARK_DRIVER_PYTHON.key, "python3")
+    val pySparkFiles = Utils.getTestFileAbsolutePath("python_executable_check.py", sparkHomeDir)
+    testPython(pySparkFiles,
+      Seq(
+        s"PYSPARK_PYTHON: ./test_env/$fileName",
+        "PYSPARK_DRIVER_PYTHON: python3",
+        "Custom Python used on executor: True",
+        "Custom Python used on driver: False"))
+  }
+
   test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) {
+    val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir)
     val inDepsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
     val outDepsFile = s"${inDepsFile.substring(0, inDepsFile.lastIndexOf("."))}.zip"
     Utils.createZipFile(inDepsFile, outDepsFile)
-    testPythonDeps(outDepsFile)
+    testPython(
+      pySparkFiles,
+      Seq(
+        "Python runtime version check is: True",
+        "Python environment version check is: True",
+        "Python runtime version check for executor is: True"),
+      Some(outDepsFile))
   }
 
-  private def testPythonDeps(depsFile: String): Unit = {
-    tryDepsTest({
-      val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir)
+  private def testPython(
+      pySparkFiles: String,
+      expectedDriverLogs: Seq[String],
+      depsFile: Option[String] = None,
+      env: Map[String, String] = Map.empty[String, String]): Unit = {
+    tryDepsTest {
       setPythonSparkConfProperties(sparkAppConf)
       runSparkApplicationAndVerifyCompletion(
         appResource = pySparkFiles,
         mainClass = "",
-        expectedDriverLogOnCompletion = Seq(
-          "Python runtime version check is: True",
-          "Python environment version check is: True",
-          "Python runtime version check for executor is: True"),
+        expectedDriverLogOnCompletion = expectedDriverLogs,
         appArgs = Array("python3"),
         driverPodChecker = doBasicDriverPyPodCheck,
         executorPodChecker = doBasicExecutorPyPodCheck,
         appLocator = appLocator,
         isJVM = false,
-        pyFiles = Option(depsFile)) })
-  }
-
-  private def extractS3Key(data: String, key: String): String = {
-    data.split("\n")
-      .filter(_.contains(key))
-      .head
-      .split(":")
-      .last
-      .trim
-      .replaceAll("[,|\"]", "")
+        pyFiles = depsFile,
+        env = env)
+    }
   }
 
   private def createS3Bucket(accessKey: String, secretKey: String, endPoint: String): Unit = {
@@ -269,7 +311,6 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
 
   private def setPythonSparkConfProperties(conf: SparkAppConf): Unit = {
     sparkAppConf.set("spark.kubernetes.container.image", pyImage)
-      .set("spark.kubernetes.pyspark.pythonVersion", "3")
   }
 
   private def tryDepsTest(runTest: => Unit): Unit = {
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 7b2a2d0..494c825 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -274,7 +274,8 @@ class KubernetesSuite extends SparkFunSuite
       isJVM: Boolean,
       pyFiles: Option[String] = None,
       executorPatience: Option[(Option[Interval], Option[Timeout])] = None,
-      decommissioningTest: Boolean = false): Unit = {
+      decommissioningTest: Boolean = false,
+      env: Map[String, String] = Map.empty[String, String]): Unit = {
 
   // scalastyle:on argcount
     val appArguments = SparkAppArguments(
@@ -370,7 +371,8 @@ class KubernetesSuite extends SparkFunSuite
       TIMEOUT.value.toSeconds.toInt,
       sparkHomeDir,
       isJVM,
-      pyFiles)
+      pyFiles,
+      env)
 
     val driverPod = kubernetesTestComponents.kubernetesClient
       .pods()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 0bf01e6..0392008 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -109,7 +109,8 @@ private[spark] object SparkAppLauncher extends Logging {
       timeoutSecs: Int,
       sparkHomeDir: Path,
       isJVM: Boolean,
-      pyFiles: Option[String] = None): Unit = {
+      pyFiles: Option[String] = None,
+      env: Map[String, String] = Map.empty[String, String]): Unit = {
     val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
     logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
     val preCommandLine = if (isJVM) {
@@ -130,6 +131,6 @@ private[spark] object SparkAppLauncher extends Logging {
       commandLine ++= appArguments.appArgs
     }
     logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
-    ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
+    ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs, env = env)
   }
 }
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
index a1ecd48..cc05990 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.integrationtest
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 
@@ -32,8 +33,10 @@ object ProcessUtils extends Logging {
   def executeProcess(
       fullCommand: Array[String],
       timeout: Long,
-      dumpErrors: Boolean = true): Seq[String] = {
+      dumpErrors: Boolean = true,
+      env: Map[String, String] = Map.empty[String, String]): Seq[String] = {
     val pb = new ProcessBuilder().command(fullCommand: _*)
+    pb.environment().putAll(env.asJava)
     pb.redirectErrorStream(true)
     val proc = pb.start()
     val outputLines = new ArrayBuffer[String]
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index 5194431..cc25853 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -153,6 +153,7 @@ object Utils extends Logging {
   }
 
   def createTarGzFile(inFile: String, outFile: String): Unit = {
+    val oFile = new File(outFile)
     val fileToTarGz = new File(inFile)
     Utils.tryWithResource(
       new FileInputStream(fileToTarGz)
@@ -160,15 +161,19 @@ object Utils extends Logging {
       Utils.tryWithResource(
         new TarArchiveOutputStream(
           new GzipCompressorOutputStream(
-            new FileOutputStream(
-              new File(outFile))))
+            new FileOutputStream(oFile)))
       ) { tOut =>
         val tarEntry = new TarArchiveEntry(fileToTarGz, fileToTarGz.getName)
+        // Each entry does not keep the file permission from the input file.
+        // Setting permissions in the input file do not work. Just simply set
+        // to 777.
+        tarEntry.setMode(0x81ff)
         tOut.putArchiveEntry(tarEntry)
         IOUtils.copy(fis, tOut)
         tOut.closeArchiveEntry()
         tOut.finish()
       }
     }
+    oFile.deleteOnExit()
   }
 }
diff --git a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
index f6b3be2..e6c0137 100644
--- a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
+++ b/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
@@ -24,7 +24,7 @@ def version_check(python_env, major_python_version):
         These are various tests to test the Python container image.
         This file will be distributed via --py-files in the e2e tests.
     """
-    env_version = os.environ.get('PYSPARK_PYTHON')
+    env_version = os.environ.get('PYSPARK_PYTHON', 'python3')
     print("Python runtime version check is: " +
           str(sys.version_info[0] == major_python_version))
 
diff --git a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py b/resource-managers/kubernetes/integration-tests/tests/python_executable_check.py
similarity index 52%
copy from resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
copy to resource-managers/kubernetes/integration-tests/tests/python_executable_check.py
index f6b3be2..89fd2aa 100644
--- a/resource-managers/kubernetes/integration-tests/tests/py_container_checks.py
+++ b/resource-managers/kubernetes/integration-tests/tests/python_executable_check.py
@@ -14,19 +14,27 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 import os
-import sys
 
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+    spark = SparkSession \
+        .builder \
+        .appName("PythonExecutableTest") \
+        .getOrCreate()
+
+    # Check python executable at executors
+    is_custom_python_executor = spark.range(1).rdd.map(
+        lambda _: "IS_CUSTOM_PYTHON" in os.environ).first()
+
+    print("PYSPARK_PYTHON: %s" % os.environ.get("PYSPARK_PYTHON"))
+    print("PYSPARK_DRIVER_PYTHON: %s" % os.environ.get("PYSPARK_DRIVER_PYTHON"))
+
+    print("Custom Python used on executor: %s" % is_custom_python_executor)
 
-def version_check(python_env, major_python_version):
-    """
-        These are various tests to test the Python container image.
-        This file will be distributed via --py-files in the e2e tests.
-    """
-    env_version = os.environ.get('PYSPARK_PYTHON')
-    print("Python runtime version check is: " +
-          str(sys.version_info[0] == major_python_version))
+    is_custom_python_driver = "IS_CUSTOM_PYTHON" in os.environ
+    print("Custom Python used on driver: %s" % is_custom_python_driver)
 
-    print("Python environment version check is: " +
-          str(env_version == python_env))
+    spark.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org