You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2018/03/16 08:22:16 UTC

spark git commit: [SPARK-23635][YARN] AM env variable should not overwrite same name env variable set through spark.executorEnv.

Repository: spark
Updated Branches:
  refs/heads/master ca83526de -> c95200048


[SPARK-23635][YARN] AM env variable should not overwrite same name env variable set through spark.executorEnv.

## What changes were proposed in this pull request?

In the current Spark on YARN code, AM always will copy and overwrite its env variables to executors, so we cannot set different values for executors.

To reproduce issue, user could start spark-shell like:

```
./bin/spark-shell --master yarn-client --conf spark.executorEnv.SPARK_ABC=executor_val --conf  spark.yarn.appMasterEnv.SPARK_ABC=am_val
```

Then check executor env variables by

```
sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq }.collect.foreach(println)
```

We will always get `am_val` instead of `executor_val`. So we should not let AM to overwrite specifically set executor env variables.

## How was this patch tested?

Added UT and tested in local cluster.

Author: jerryshao <ss...@hortonworks.com>

Closes #20799 from jerryshao/SPARK-23635.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9520004
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9520004
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9520004

Branch: refs/heads/master
Commit: c952000487ee003200221b3c4e25dcb06e359f0a
Parents: ca83526
Author: jerryshao <ss...@hortonworks.com>
Authored: Fri Mar 16 16:22:03 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri Mar 16 16:22:03 2018 +0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ExecutorRunnable.scala    | 22 +++++++-----
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 36 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c9520004/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 3f4d236..ab08698 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -220,12 +220,6 @@ private[yarn] class ExecutorRunnable(
     val env = new HashMap[String, String]()
     Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
 
-    sparkConf.getExecutorEnv.foreach { case (key, value) =>
-      // This assumes each executor environment variable set here is a path
-      // This is kept for backward compatibility and consistency with hadoop
-      YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
-    }
-
     // lookup appropriate http scheme for container log urls
     val yarnHttpPolicy = conf.get(
       YarnConfiguration.YARN_HTTP_POLICY_KEY,
@@ -233,6 +227,20 @@ private[yarn] class ExecutorRunnable(
     )
     val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
 
+    System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
+      .foreach { case (k, v) => env(k) = v }
+
+    sparkConf.getExecutorEnv.foreach { case (key, value) =>
+      if (key == Environment.CLASSPATH.name()) {
+        // If the key of env variable is CLASSPATH, we assume it is a path and append it.
+        // This is kept for backward compatibility and consistency with hadoop
+        YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
+      } else {
+        // For other env variables, simply overwrite the value.
+        env(key) = value
+      }
+    }
+
     // Add log urls
     container.foreach { c =>
       sys.env.get("SPARK_USER").foreach { user =>
@@ -245,8 +253,6 @@ private[yarn] class ExecutorRunnable(
       }
     }
 
-    System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
-      .foreach { case (k, v) => env(k) = v }
     env
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c9520004/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 33d400a..a129be7 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -225,6 +225,14 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     finalState should be (SparkAppHandle.State.FAILED)
   }
 
+  test("executor env overwrite AM env in client mode") {
+    testExecutorEnv(true)
+  }
+
+  test("executor env overwrite AM env in cluster mode") {
+    testExecutorEnv(false)
+  }
+
   private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
     val result = File.createTempFile("result", null, tempDir)
     val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
@@ -305,6 +313,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     checkResult(finalState, executorResult, "OVERRIDDEN")
   }
 
+  private def testExecutorEnv(clientMode: Boolean): Unit = {
+    val result = File.createTempFile("result", null, tempDir)
+    val finalState = runSpark(clientMode, mainClassName(ExecutorEnvTestApp.getClass),
+      appArgs = Seq(result.getAbsolutePath),
+      extraConf = Map(
+        "spark.yarn.appMasterEnv.TEST_ENV" -> "am_val",
+        "spark.executorEnv.TEST_ENV" -> "executor_val"
+      )
+    )
+    checkResult(finalState, result, "true")
+  }
 }
 
 private[spark] class SaveExecutorInfo extends SparkListener {
@@ -526,3 +545,20 @@ private object SparkContextTimeoutApp {
   }
 
 }
+
+private object ExecutorEnvTestApp {
+
+  def main(args: Array[String]): Unit = {
+    val status = args(0)
+    val sparkConf = new SparkConf()
+    val sc = new SparkContext(sparkConf)
+    val executorEnvs = sc.parallelize(Seq(1)).flatMap { _ => sys.env }.collect().toMap
+    val result = sparkConf.getExecutorEnv.forall { case (k, v) =>
+      executorEnvs.get(k).contains(v)
+    }
+
+    Files.write(result.toString, new File(status), StandardCharsets.UTF_8)
+    sc.stop()
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org