You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/02 03:28:58 UTC

spark git commit: [SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster

Repository: spark
Updated Branches:
  refs/heads/master 1ca0a1014 -> 7712ed5b1


[SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster

Modified environment strings and path separators to platform-independent style if possible.

Author: Masayoshi TSUZUKI <ts...@oss.nttdata.co.jp>

Closes #3943 from tsudukim/feature/SPARK-1825 and squashes the following commits:

ec4b865 [Masayoshi TSUZUKI] Rebased and modified as comments.
f8a1d5a [Masayoshi TSUZUKI] Merge branch 'master' of github.com:tsudukim/spark into feature/SPARK-1825
3d03d35 [Masayoshi TSUZUKI] [SPARK-1825] Make Windows Spark client work fine with Linux YARN cluster


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7712ed5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7712ed5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7712ed5b

Branch: refs/heads/master
Commit: 7712ed5b16d809e4cf63285b78f9b65d2588fb21
Parents: 1ca0a10
Author: Masayoshi TSUZUKI <ts...@oss.nttdata.co.jp>
Authored: Sun Feb 1 18:26:28 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Sun Feb 1 18:28:55 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 21 +++++++++----
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  8 +++--
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 31 +++++++++++++++++++-
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 18 ++++++++----
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 25 ++++++++++++++++
 5 files changed, 89 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7712ed5b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d4eeccf..1a18e65 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -400,7 +400,10 @@ private[spark] class Client(
     // Add Xmx for AM memory
     javaOpts += "-Xmx" + args.amMemory + "m"
 
-    val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+    val tmpDir = new Path(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+      YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
+    )
     javaOpts += "-Djava.io.tmpdir=" + tmpDir
 
     // TODO: Remove once cpuset version is pushed out.
@@ -491,7 +494,9 @@ private[spark] class Client(
           "--num-executors ", args.numExecutors.toString)
 
     // Command for the ApplicationMaster
-    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+    val commands = prefixEnv ++ Seq(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
+      ) ++
       javaOpts ++ amArgs ++
       Seq(
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
@@ -769,7 +774,9 @@ object Client extends Logging {
       env: HashMap[String, String],
       extraClassPath: Option[String] = None): Unit = {
     extraClassPath.foreach(addClasspathEntry(_, env))
-    addClasspathEntry(Environment.PWD.$(), env)
+    addClasspathEntry(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
+    )
 
     // Normally the users app.jar is last in case conflicts with spark jars
     if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
@@ -783,7 +790,9 @@ object Client extends Logging {
     }
 
     // Append all jar files under the working directory to the classpath.
-    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
+    addClasspathEntry(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + "*", env
+    )
   }
 
   /**
@@ -838,7 +847,9 @@ object Client extends Logging {
       }
     }
     if (fileName != null) {
-      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
+      addClasspathEntry(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + fileName, env
+      )
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7712ed5b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index c537da9..ee2002a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -142,7 +142,10 @@ class ExecutorRunnable(
     }
 
     javaOpts += "-Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+      new Path(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
+      )
 
     // Certain configs need to be passed here because they are needed before the Executor
     // registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -181,7 +184,8 @@ class ExecutorRunnable(
     // For log4j configuration to reference
     javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
-    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
+    val commands = prefixEnv ++ Seq(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
       "-server",
       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
       // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in

http://git-wip-us.apache.org/repos/asf/spark/blob/7712ed5b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4e39c1d..146b2c0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -22,12 +22,15 @@ import java.util.regex.Matcher
 import java.util.regex.Pattern
 
 import scala.collection.mutable.HashMap
+import scala.util.Try
 
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
 import org.apache.hadoop.conf.Configuration
 
@@ -102,7 +105,7 @@ object YarnSparkHadoopUtil {
    * If the map already contains this key, append the value to the existing value instead.
    */
   def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
-    val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value
+    val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator  + value } else value
     env.put(key, newValue)
   }
 
@@ -182,4 +185,30 @@ object YarnSparkHadoopUtil {
     )
   }
 
+  /**
+   * Expand environment variable using Yarn API.
+   * If environment.$$() is implemented, return the result of it.
+   * Otherwise, return the result of environment.$()
+   * Note: $$() is added in Hadoop 2.4.
+   */
+  private lazy val expandMethod =
+    Try(classOf[Environment].getMethod("$$"))
+      .getOrElse(classOf[Environment].getMethod("$"))
+
+  def expandEnvironment(environment: Environment): String =
+    expandMethod.invoke(environment).asInstanceOf[String]
+
+  /**
+   * Get class path separator using Yarn API.
+   * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
+   * Otherwise, return File.pathSeparator
+   * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
+   */
+  private lazy val classPathSeparatorField =
+    Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
+      .getOrElse(classOf[File].getField("pathSeparator"))
+
+  def getClassPathSeparator(): String = {
+    classPathSeparatorField.get(null).asInstanceOf[String]
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7712ed5b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index aad5001..2bb3dcf 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Matchers._
 import org.mockito.Mockito._
-
-
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 
@@ -89,7 +87,7 @@ class ClientSuite extends FunSuite with Matchers {
 
     Client.populateClasspath(args, conf, sparkConf, env)
 
-    val cp = env("CLASSPATH").split(File.pathSeparator)
+    val cp = env("CLASSPATH").split(":|;|<CPS>")
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
       val uri = new URI(entry)
       if (Client.LOCAL_SCHEME.equals(uri.getScheme())) {
@@ -98,8 +96,16 @@ class ClientSuite extends FunSuite with Matchers {
         cp should not contain (uri.getPath())
       }
     })
-    cp should contain (Environment.PWD.$())
-    cp should contain (s"${Environment.PWD.$()}${File.separator}*")
+    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+      cp should contain("{{PWD}}")
+      cp should contain(s"{{PWD}}${Path.SEPARATOR}*")
+    } else if (Utils.isWindows) {
+      cp should contain("%PWD%")
+      cp should contain(s"%PWD%${Path.SEPARATOR}*")
+    } else {
+      cp should contain(Environment.PWD.$())
+      cp should contain(s"${Environment.PWD.$()}${File.separator}*")
+    }
     cp should not contain (Client.SPARK_JAR)
     cp should not contain (Client.APP_JAR)
   }
@@ -223,7 +229,7 @@ class ClientSuite extends FunSuite with Matchers {
 
   def newEnv = MutableHashMap[String, String]()
 
-  def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;")
+  def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;|<CPS>")
 
   def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7712ed5b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 2cc5abb..b5a2db8 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,12 +20,15 @@ package org.apache.spark.deploy.yarn
 import java.io.{File, IOException}
 
 import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.{FunSuite, Matchers}
 
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
 
 
 class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
@@ -148,4 +151,26 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
     }
 
   }
+
+  test("test expandEnvironment result") {
+    val target = Environment.PWD
+    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+      YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}")
+    } else if (Utils.isWindows) {
+      YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%")
+    } else {
+      YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
+    }
+
+  }
+
+  test("test getClassPathSeparator result") {
+    if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) {
+      YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
+    } else if (Utils.isWindows) {
+      YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
+    } else {
+      YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org