You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by at...@apache.org on 2021/11/16 17:10:27 UTC

[spark] branch master updated: [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line

This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 693537f  [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line
693537f is described below

commit 693537f7852b8a3b9f3cf3dc21eb1c132f64db0d
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Tue Nov 16 18:08:48 2021 +0100

    [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line
    
    ### What changes were proposed in this pull request?
    Refactor the logic for constructing the user classpath from `yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the executor side as well, instead of having the driver construct it and pass it to the executor via command-line arguments. A new method, `getUserClassPath`, is added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent with the existing behavior where non-YARN resource managers do not configure the user classpath). `YarnCoarseGrained [...]
    
    Please note that this is a re-submission of #32810, which was reverted in #34082 due to the issues described in [this comment](https://issues.apache.org/jira/browse/SPARK-35672?focusedCommentId=17419285&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17419285). This PR additionally includes the changes described in #34084 to resolve the issue, though this PR has been enhanced to properly handle escape strings, unlike #34084.
    
    ### Why are the changes needed?
    User-provided JARs are made available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently in the case of YARN, this list of JARs is crafted by the Driver (in `ExecutorRunnable`), which then passes the information to the executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the executor command line as `--user-class-pat [...]
    
    > /bin/bash: Argument list too long
    
    A [Google search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22) indicates that this is not a theoretical problem and afflicts real users, including ours. Passing this list using the configurations instead resolves this issue.
    
    ### Does this PR introduce _any_ user-facing change?
    There is one small behavioral change which is a bug fix. Previously the `spark.yarn.config.gatewayPath` and `spark.yarn.config.replacementPath` options were only applied to executors, meaning they would not work for the driver when running in cluster mode. This appears to be a bug; the [documentation for this functionality](https://spark.apache.org/docs/latest/running-on-yarn.html) does not mention any limitations that this is only for executors. This PR fixes that issue.
    
    Additionally, this fixes the main bash argument length issue, allowing for larger JAR lists to be passed successfully. Configuration of JARs is identical to before, and substitution of environment variables in `spark.jars` or `spark.yarn.config.replacementPath` works as expected.
    
    ### How was this patch tested?
    New unit tests were added in `YarnClusterSuite`. Also, we have been running a similar fix internally for 4 months with great success.
    
    Closes #34120 from xkrogen/xkrogen-SPARK-35672-yarn-classpath-list-take2.
    
    Authored-by: Erik Krogen <xk...@apache.org>
    Signed-off-by: attilapiros <pi...@gmail.com>
---
 .../executor/CoarseGrainedExecutorBackend.scala    | 17 ++---
 .../scala/org/apache/spark/executor/Executor.scala |  2 +
 .../CoarseGrainedExecutorBackendSuite.scala        | 17 ++---
 .../cluster/k8s/KubernetesExecutorBackend.scala    |  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala      |  9 +--
 .../org/apache/spark/deploy/yarn/Client.scala      | 34 ++++++++-
 .../spark/deploy/yarn/ExecutorRunnable.scala       | 12 ---
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    | 75 ++++++++++++++++++-
 .../YarnCoarseGrainedExecutorBackend.scala         |  8 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 +++++++++
 .../spark/deploy/yarn/YarnClusterSuite.scala       | 86 +++++++++++++++++++---
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala     | 40 ++++++++++
 12 files changed, 280 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4f63ada..43887a7 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -52,7 +52,6 @@ private[spark] class CoarseGrainedExecutorBackend(
     bindAddress: String,
     hostname: String,
     cores: Int,
-    userClassPath: Seq[URL],
     env: SparkEnv,
     resourcesFileOpt: Option[String],
     resourceProfile: ResourceProfile)
@@ -124,7 +123,7 @@ private[spark] class CoarseGrainedExecutorBackend(
    */
   private def createClassLoader(): MutableURLClassLoader = {
     val currentLoader = Utils.getContextOrSparkClassLoader
-    val urls = userClassPath.toArray
+    val urls = getUserClassPath.toArray
     if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) {
       new ChildFirstURLClassLoader(urls, currentLoader)
     } else {
@@ -149,6 +148,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     }
   }
 
+  def getUserClassPath: Seq[URL] = Nil
+
   def extractLogUrls: Map[String, String] = {
     val prefix = "SPARK_LOG_URL_"
     sys.env.filterKeys(_.startsWith(prefix))
@@ -165,7 +166,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       try {
-        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
+        executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
           resources = _resources)
         driver.get.send(LaunchedExecutor(executorId))
       } catch {
@@ -398,7 +399,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       cores: Int,
       appId: String,
       workerUrl: Option[String],
-      userClassPath: mutable.ListBuffer[URL],
       resourcesFileOpt: Option[String],
       resourceProfileId: Int)
 
@@ -406,7 +406,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
       new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
-        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
+        arguments.bindAddress, arguments.hostname, arguments.cores,
         env, arguments.resourcesFileOpt, resourceProfile)
     }
     run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
@@ -494,7 +494,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     var resourcesFileOpt: Option[String] = None
     var appId: String = null
     var workerUrl: Option[String] = None
-    val userClassPath = new mutable.ListBuffer[URL]()
     var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
 
     var argv = args.toList
@@ -525,9 +524,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
           // Worker url is used in spark standalone mode to enforce fate-sharing with worker
           workerUrl = Some(value)
           argv = tail
-        case ("--user-class-path") :: value :: tail =>
-          userClassPath += new URL(value)
-          argv = tail
         case ("--resourceProfileId") :: value :: tail =>
           resourceProfileId = value.toInt
           argv = tail
@@ -554,7 +550,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     }
 
     Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
-      userClassPath, resourcesFileOpt, resourceProfileId)
+      resourcesFileOpt, resourceProfileId)
   }
 
   private def printUsageAndExit(classNameForEntry: String): Unit = {
@@ -572,7 +568,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       |   --resourcesFile <fileWithJSONResourceInformation>
       |   --app-id <appid>
       |   --worker-url <workerUrl>
-      |   --user-class-path <url>
       |   --resourceProfileId <id>
       |""".stripMargin)
     // scalastyle:on println
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ef02c93..a505160 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -886,6 +886,8 @@ private[spark] class Executor(
     val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
       new File(uri.split("/").last).toURI.toURL
     }
+    logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " +
+        urls.mkString("'", ",", "'"))
     if (userClassPathFirst) {
       new ChildFirstURLClassLoader(urls, currentLoader)
     } else {
diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 3aa181c..9bbfdc7 100644
--- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.executor
 
 import java.io.File
-import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Properties
 
@@ -56,7 +55,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
     withTempDir { tmpDir =>
       val testResourceArgs: JObject = ("" -> "")
       val ja = JArray(List(testResourceArgs))
@@ -77,7 +76,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
+      4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
     withTempDir { tmpDir =>
       val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
       val ja = Extraction.decompose(Seq(ra))
@@ -111,7 +110,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
 
     withTempDir { tmpDir =>
       val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@@ -138,7 +137,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
+      4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
 
     // not enough gpu's on the executor
     withTempDir { tmpDir =>
@@ -191,7 +190,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
 
     // executor resources < required
     withTempDir { tmpDir =>
@@ -222,7 +221,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
       // we don't really use this, just need it to get at the parser function
       val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-        4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
+        4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
 
       val parsedResources = backend.parseOrFindResources(None)
 
@@ -269,7 +268,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
 
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
-      4, Seq.empty[URL], env, None, resourceProfile)
+      4, env, None, resourceProfile)
     val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
     val ja = Extraction.decompose(Seq(gpuArgs))
     val f1 = createTempJsonFile(dir, "resources", ja)
@@ -294,7 +293,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
       val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
       val env = createMockEnv(conf, serializer, Some(rpcEnv))
         backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
-        "host1", "host1", 4, Seq.empty[URL], env, None,
+        "host1", "host1", 4, env, None,
           resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
       assert(backend.taskResources.isEmpty)
 
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
index 481561a..dd06688 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
@@ -50,7 +50,7 @@ private[spark] object KubernetesExecutorBackend extends Logging {
     val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) =>
         new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
-        arguments.bindAddress, arguments.hostname, arguments.cores, Seq.empty,
+        arguments.bindAddress, arguments.hostname, arguments.cores,
         env, arguments.resourcesFileOpt, resourceProfile)
     }
     run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index a25edb0..42e661c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{File, IOException}
+import java.io.IOException
 import java.lang.reflect.{InvocationTargetException, Modifier}
-import java.net.{URI, URL, URLEncoder}
+import java.net.{URI, URLEncoder}
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{TimeoutException, TimeUnit}
 
@@ -85,10 +85,7 @@ private[spark] class ApplicationMaster(
   private var metricsSystem: Option[MetricsSystem] = None
 
   private val userClassLoader = {
-    val classpath = Client.getUserClasspath(sparkConf)
-    val urls = classpath.map { entry =>
-      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
-    }
+    val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode)
 
     if (isClusterMode) {
       if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5de6b70..4763115 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.{FileSystem => _, _}
-import java.net.{InetAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI, URL}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
-import java.nio.file.Files
+import java.nio.file.{Files, Paths}
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -1312,7 +1312,7 @@ private[spark] class Client(
 
 }
 
-private object Client extends Logging {
+private[spark] object Client extends Logging {
 
   // Alias for the user jar
   val APP_JAR_NAME: String = "__app__.jar"
@@ -1474,6 +1474,34 @@ private object Client extends Logging {
     (mainUri ++ secondaryUris).toArray
   }
 
+  /**
+   * Returns a list of local, absolute file URLs representing the user classpath. Note that this
+   * must be executed on the same host which will access the URLs, as it will resolve relative
+   * paths based on the current working directory, as well as environment variables.
+   * See SPARK-35672 for discussion of why it is necessary to do environment variable substitution.
+   *
+   * @param conf Spark configuration.
+   * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
+   *                       `local` scheme. This should be used when running on the cluster, but
+   *                       not when running on the gateway (i.e. for the driver in `client` mode).
+   * @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]].
+   */
+  def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
+    Client.getUserClasspath(conf).map { uri =>
+      val inputPath = uri.getPath
+      val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
+        Client.getClusterPath(conf, inputPath)
+      } else {
+        // Any other URI schemes should have been resolved by this point
+        assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
+          "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
+        inputPath
+      }
+      val envVarResolvedFilePath = YarnSparkHadoopUtil.replaceEnvVars(replacedFilePath, sys.env)
+      Paths.get(envVarResolvedFilePath).toAbsolutePath.toUri.toURL
+    }
+  }
+
   private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
     mainJar.flatMap { path =>
       val uri = Utils.resolveURI(path)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 717ce57..dbf4a0a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.nio.ByteBuffer
 import java.util.Collections
 
@@ -190,16 +189,6 @@ private[yarn] class ExecutorRunnable(
     // For log4j configuration to reference
     javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
-    val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
-      val absPath =
-        if (new File(uri.getPath()).isAbsolute()) {
-          Client.getClusterPath(sparkConf, uri.getPath())
-        } else {
-          Client.buildPath(Environment.PWD.$(), uri.getPath())
-        }
-      Seq("--user-class-path", "file:" + absPath)
-    }.toSeq
-
     YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
     val commands = prefixEnv ++
       Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
@@ -211,7 +200,6 @@ private[yarn] class ExecutorRunnable(
         "--cores", executorCores.toString,
         "--app-id", appId,
         "--resourceProfileId", resourceProfileId.toString) ++
-      userClassPath ++
       Seq(
         s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
         s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 09766bf..f347e37 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -19,7 +19,9 @@ package org.apache.spark.deploy.yarn
 
 import java.util.regex.{Matcher, Pattern}
 
+import scala.collection.immutable.{Map => IMap}
 import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.util.matching.Regex
 
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
@@ -93,14 +95,83 @@ object YarnSparkHadoopUtil {
     }
   }
 
+  /**
+   * Regex pattern to match the name of an environment variable. Note that Unix variable naming
+   * conventions (alphanumeric plus underscore, case-sensitive, can't start with a digit)
+   * are used for both Unix and Windows, following the convention of Hadoop's `Shell` class
+   * (see specifically [[org.apache.hadoop.util.Shell.getEnvironmentVariableRegex]]).
+   */
+  private val envVarNameRegex: String = "[A-Za-z_][A-Za-z0-9_]*"
+
+  /**
+   * Note that this regex only supports the `$VAR_NAME` and `%VAR_NAME%` syntax, for Unix and
+   * Windows respectively, and does not perform any handling of escapes. The Unix `${VAR_NAME}`
+   * syntax is not supported.
+   */
   private val environmentVariableRegex: String = {
     if (Utils.isWindows) {
-      "%([A-Za-z_][A-Za-z0-9_]*?)%"
+      s"%($envVarNameRegex)%"
     } else {
-      "\\$([A-Za-z_][A-Za-z0-9_]*)"
+      s"\\$$($envVarNameRegex)"
     }
   }
 
+  // scalastyle:off line.size.limit
+  /**
+   * Replace environment variables in a string according to the same rules as
+   * [[org.apache.hadoop.yarn.api.ApplicationConstants.Environment]]:
+   * `$VAR_NAME` for Unix, `%VAR_NAME%` for Windows, and `{{VAR_NAME}}` for all OS.
+   * The `${VAR_NAME}` syntax is also supported for Unix.
+   * This support escapes for `$` and `\` (on Unix) and `%` and `^` characters (on Windows), e.g.
+   * `\$FOO`, `^%FOO^%`, and `%%FOO%%` will be resolved to `$FOO`, `%FOO%`, and `%FOO%`,
+   * respectively, instead of being treated as variable names.
+   *
+   * @param unresolvedString The unresolved string which may contain variable references.
+   * @param env The System environment
+   * @param isWindows True iff running in a Windows environment
+   * @return The input string with variables replaced with their values from `env`
+   * @see [[https://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html Environment Variables (IEEE Std 1003.1-2017)]]
+   * @see [[https://en.wikibooks.org/wiki/Windows_Batch_Scripting#Quoting_and_escaping Windows Batch Scripting | Quoting and Escaping]]
+   */
+  // scalastyle:on line.size.limit
+  def replaceEnvVars(
+      unresolvedString: String,
+      env: IMap[String, String],
+      isWindows: Boolean = Utils.isWindows): String = {
+    val osResolvedString = if (isWindows) {
+      // ^% or %% can both be used as escapes for Windows
+      val windowsPattern = ("""(?i)(?:\^\^|\^%|%%|%(""" + envVarNameRegex + ")%)").r
+      windowsPattern.replaceAllIn(unresolvedString, m =>
+        Regex.quoteReplacement(m.matched match {
+          case "^^" => "^"
+          case "^%" => "%"
+          case "%%" => "%"
+          case _ => env.getOrElse(m.group(1), "")
+        })
+      )
+    } else {
+      val unixPattern =
+        ("""(?i)(?:\\\\|\\\$|\$(""" + envVarNameRegex + """)|\$\{(""" + envVarNameRegex + ")})").r
+      unixPattern.replaceAllIn(unresolvedString, m =>
+        Regex.quoteReplacement(m.matched match {
+          case """\\""" => """\"""
+          case """\$""" => """$"""
+          case str if str.startsWith("${") => env.getOrElse(m.group(2), "")
+          case _ => env.getOrElse(m.group(1), "")
+        })
+      )
+    }
+
+    // YARN uses `{{...}}` to represent OS-agnostic variable expansion strings. Normally the
+    // NodeManager would replace this string with an OS-specific replacement before launching
+    // the container. Here, it gets directly treated as an additional expansion string, which
+    // has the same net result.
+    // Ref: Javadoc for org.apache.hadoop.yarn.api.ApplicationConstants.Environment.$$()
+    val yarnPattern = ("""(?i)\{\{(""" + envVarNameRegex + ")}}").r
+    yarnPattern.replaceAllIn(osResolvedString,
+      m => Regex.quoteReplacement(env.getOrElse(m.group(1), "")))
+  }
+
   /**
    * Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
    * Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
index ce46ffa..3dd51f1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
@@ -21,6 +21,7 @@ import java.net.URL
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.Client
 import org.apache.spark.internal.Logging
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcEnv
@@ -38,7 +39,6 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     bindAddress: String,
     hostname: String,
     cores: Int,
-    userClassPath: Seq[URL],
     env: SparkEnv,
     resourcesFile: Option[String],
     resourceProfile: ResourceProfile)
@@ -49,13 +49,15 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
     bindAddress,
     hostname,
     cores,
-    userClassPath,
     env,
     resourcesFile,
     resourceProfile) with Logging {
 
   private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
 
+  override def getUserClassPath: Seq[URL] =
+    Client.getUserClasspathUrls(env.conf, useClusterPath = true)
+
   override def extractLogUrls: Map[String, String] = {
     YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
       .getOrElse(Map())
@@ -73,7 +75,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
     val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
       new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
-        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
+        arguments.bindAddress, arguments.hostname, arguments.cores,
         env, arguments.resourcesFileOpt, resourceProfile)
     }
     val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index a8815dc..7a5bc52 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
 import java.net.URI
+import java.nio.file.Paths
 import java.util.Properties
 import java.util.concurrent.ConcurrentHashMap
 
@@ -633,6 +634,40 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }
   }
 
+  test("SPARK-35672: test Client.getUserClasspathUrls") {
+    val gatewayRootPath = "/local/matching/replace"
+    val replacementRootPath = "/replaced/path"
+    val conf = new SparkConf()
+        .set(SECONDARY_JARS, Seq(
+          s"local:$gatewayRootPath/foo.jar",
+          "local:/local/not/matching/replace/foo.jar",
+          "file:/absolute/file/path/foo.jar",
+          s"$gatewayRootPath/but-not-actually-local/foo.jar",
+          "/absolute/path/foo.jar",
+          "relative/path/foo.jar"
+        ))
+        .set(GATEWAY_ROOT_PATH, gatewayRootPath)
+        .set(REPLACEMENT_ROOT_PATH, replacementRootPath)
+
+    def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = {
+      val expectedUrls = Seq(
+        Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString,
+        s"file:$expectedReplacementPath/foo.jar",
+        "file:/local/not/matching/replace/foo.jar",
+        "file:/absolute/file/path/foo.jar",
+        // since this path wasn't a local URI, it should never be replaced
+        s"file:$gatewayRootPath/but-not-actually-local/foo.jar",
+        "file:/absolute/path/foo.jar",
+        Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString
+      ).map(URI.create(_).toURL).toArray
+      assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls)
+    }
+    // assert that no replacement happens when cluster = false by expecting the replacement
+    // path to be the same as the original path
+    assertUserClasspathUrls(cluster = false, gatewayRootPath)
+    assertUserClasspathUrls(cluster = true, replacementRootPath)
+  }
+
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 26ff3bf..9fd3c70 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
+import java.net.URL
 import java.nio.charset.StandardCharsets
+import java.nio.file.Paths
 import java.util.{HashMap => JHashMap}
 
 import scala.collection.mutable
@@ -150,12 +152,70 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, result)
   }
 
-  test("run Spark in yarn-client mode with additional jar") {
-    testWithAddJar(true)
+  test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") {
+    val jarPath = createJarWithOriginalResourceFile().getPath
+    testWithAddJar(clientMode = true, s"local:$jarPath")
   }
 
-  test("run Spark in yarn-cluster mode with additional jar") {
-    testWithAddJar(false)
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") {
+    val jarPath = createJarWithOriginalResourceFile().getPath
+    testWithAddJar(clientMode = false, s"local:$jarPath")
+  }
+
+  test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " +
+      "and gateway-replacement path") {
+    // Use the original jar URL, but set up the gateway/replacement configs such that if
+    // replacement occurs, things will break. This ensures the replacement doesn't apply to the
+    // driver in 'client' mode. Executors will fail in this case because they still apply the
+    // replacement in client mode.
+    val jarUrl = createJarWithOriginalResourceFile()
+    testWithAddJar(clientMode = true, s"local:${jarUrl.getPath}", Map(
+      GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString,
+      REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/"
+    ), expectExecutorFailure = true)
+  }
+
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " +
+      "and gateway-replacement path") {
+    // Put a prefix in front of the original jar URL which causes it to be an invalid path.
+    // Set up the gateway/replacement configs such that if replacement occurs, it is a valid
+    // path again (by removing the prefix). This ensures the replacement is applied.
+    val jarPath = createJarWithOriginalResourceFile().getPath
+    val gatewayPath = "/replaceme/nonexistent/"
+    testWithAddJar(clientMode = false, s"local:$gatewayPath$jarPath", Map(
+      GATEWAY_ROOT_PATH.key -> gatewayPath,
+      REPLACEMENT_ROOT_PATH.key -> ""
+    ))
+  }
+
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " +
+    "and gateway-replacement path containing an environment variable") {
+    // Treat the entire jar path as a string which needs to be replaced, and which will be replaced
+    // (using the gateway/replacement logic) by two environment variables, both of which have to be
+    // resolved properly for the resulting path to be correct. Two environment variables are
+    // used to test the two different styles of variable substitution (OS-style vs. YARN-style)
+    val jarPath = Paths.get(createJarWithOriginalResourceFile().toURI)
+
+    val envVarConfigs = for (
+      envVar <- Map("PARENT" -> jarPath.getParent, "FILENAME" -> jarPath.getFileName);
+      prefix <- Seq("spark.yarn.appMasterEnv.", "spark.executorEnv.")
+    ) yield s"$prefix${envVar._1}" -> envVar._2.toString
+
+    val osSpecificEnvVar = if (Utils.isWindows) "%PARENT%" else "${PARENT}"
+    testWithAddJar(clientMode = false, s"local:/replaceme", Map(
+      GATEWAY_ROOT_PATH.key -> "/replaceme",
+      REPLACEMENT_ROOT_PATH.key -> s"$osSpecificEnvVar/{{FILENAME}}"
+    ) ++ envVarConfigs)
+  }
+
+  test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") {
+    val jarPath = createJarWithOriginalResourceFile().getPath
+    testWithAddJar(clientMode = true, s"file:$jarPath")
+  }
+
+  test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") {
+    val jarPath = createJarWithOriginalResourceFile().getPath
+    testWithAddJar(clientMode = false, s"file:$jarPath")
   }
 
   test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -286,16 +346,22 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, result)
   }
 
-  private def testWithAddJar(clientMode: Boolean): Unit = {
-    val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+  private def createJarWithOriginalResourceFile(): URL =
+    TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+
+  private def testWithAddJar(
+      clientMode: Boolean,
+      jarPath: String,
+      extraConf: Map[String, String] = Map(),
+      expectExecutorFailure: Boolean = false): Unit = {
     val driverResult = File.createTempFile("driver", null, tempDir)
     val executorResult = File.createTempFile("executor", null, tempDir)
     val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
-      appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
-      extraClassPath = Seq(originalJar.getPath()),
-      extraJars = Seq("local:" + originalJar.getPath()))
+      appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath),
+      extraJars = Seq(jarPath),
+      extraConf = extraConf)
     checkResult(finalState, driverResult, "ORIGINAL")
-    checkResult(finalState, executorResult, "ORIGINAL")
+    checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL")
   }
 
   private def testPySpark(
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 5b762f6..a36d1fa 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -141,4 +141,44 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
     }
 
   }
+
+  test("SPARK-35672: test replaceEnvVars in Unix mode") {
+    Map(
+      "F_O_O$FOO$BAR" -> "F_O_OBAR",
+      "$FOO" -> "BAR",
+      "$F_O_O$FOO" -> "BarBAR",
+      "${FOO}" -> "BAR",
+      "$FOO.baz$BAR" -> "BAR.baz",
+      "{{FOO}}" -> "BAR",
+      "{{FOO}}$FOO" -> "BARBAR",
+      "%FOO%" -> "%FOO%",
+      """\$FOO\\\$FOO\${FOO}\\$FOO\\\\""" -> """$FOO\$FOO${FOO}\BAR\\"""
+    ).foreach { case (input, expected) =>
+      withClue(s"input string `$input`: ") {
+        val replaced = YarnSparkHadoopUtil
+          .replaceEnvVars(input, Map("F_O_O" -> "Bar", "FOO" -> "BAR"), isWindows = false)
+        assert(replaced === expected)
+      }
+    }
+  }
+
+  test("SPARK-35672: test replaceEnvVars in Windows mode") {
+    Map(
+      "Foo%FOO%%BAR%" -> "FooBAR",
+      "%FOO%" -> "BAR",
+      "%F_O_O%%FOO%" -> "BarBAR",
+      "{{FOO}}%FOO%" -> "BARBAR",
+      "$FOO" -> "$FOO",
+      "${FOO}" -> "${FOO}",
+      "%%FOO%%%FOO%%%%%%FOO%" -> "%FOO%BAR%%BAR",
+      "%FOO%^^^%FOO^%^FOO^^^^%FOO%" -> "BAR^%FOO%^FOO^^BAR"
+    ).foreach { case (input, expected) =>
+      withClue(s"input string `$input`: ") {
+        val replaced = YarnSparkHadoopUtil
+          .replaceEnvVars(input, Map("F_O_O" -> "Bar", "FOO" -> "BAR"), isWindows = true)
+        assert(replaced === expected)
+      }
+    }
+  }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org