You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/09/25 08:10:36 UTC

git commit: [SPARK-2778] [yarn] Add yarn integration tests.

Repository: spark
Updated Branches:
  refs/heads/master 8ca4ecb6a -> b8487713d


[SPARK-2778] [yarn] Add yarn integration tests.

This patch adds a couple of, currently, very simple integration tests
to make sure both client and cluster modes are working. The tests don't
do much yet other than run a simple job, but the plan is to enhance
them after we get the framework in.

The cluster tests are noisy, so redirect all log output to a file
like other tests do. Copying the conf around sucks but it's less
work than messing with maven/sbt and having to clean up other
projects.

Note the test is only added for yarn-stable. The code compiles
against yarn-alpha but there are two issues I ran into that I
could not overcome:
- an old netty dependency kept creeping into the classpath and
  causing akka to not work, when using sbt; the old netty was
  correctly suppressed under maven.
- MiniYARNCluster kept failing to execute containers because it
  did not create the NM's local dir itself; this is apparently
  a known behavior, but I'm not sure how to work around it.

None of those issues are present with the stable Yarn.

Also, these tests are a little slow to run. Apparently Spark doesn't
yet tag tests (so that these could be isolated in a "slow" batch),
so this is something to keep in mind.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #2257 from vanzin/yarn-tests and squashes the following commits:

6d5b84e [Marcelo Vanzin] Fix wrong system property being set.
8b0933d [Marcelo Vanzin] Merge branch 'master' into yarn-tests
5c2b56f [Marcelo Vanzin] Use custom log4j conf for Yarn containers.
ec73f17 [Marcelo Vanzin] More review feedback.
67f5b02 [Marcelo Vanzin] Review feedback.
f01517c [Marcelo Vanzin] Review feedback.
68fbbbf [Marcelo Vanzin] Use older constructor available in older Hadoop releases.
d07ef9a [Marcelo Vanzin] Merge branch 'master' into yarn-tests
add8416 [Marcelo Vanzin] [SPARK-2778] [yarn] Add yarn integration tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8487713
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8487713
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8487713

Branch: refs/heads/master
Commit: b8487713d3bf288a4f6fc149e6ee4cc8196d6e7d
Parents: 8ca4ecb
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 24 23:10:26 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Sep 24 23:10:26 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |  31 +++-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  10 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   |   2 +-
 .../deploy/yarn/ExecutorRunnableUtil.scala      |   2 +-
 yarn/pom.xml                                    |   3 +-
 yarn/stable/pom.xml                             |   9 ++
 yarn/stable/src/test/resources/log4j.properties |  28 ++++
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 154 +++++++++++++++++++
 8 files changed, 229 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 520aed3..f3de097 100644
--- a/pom.xml
+++ b/pom.xml
@@ -714,6 +714,35 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-server-tests</artifactId>
+        <version>${yarn.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-yarn-server-web-proxy</artifactId>
         <version>${yarn.version}</version>
         <exclusions>
@@ -1187,7 +1216,7 @@
         <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
-          <version>3.4.5-mapr-1406</version> 
+          <version>3.4.5-mapr-1406</version>
         </dependency>
       </dependencies>
     </profile>

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9050808..b51daeb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -401,17 +401,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
           // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
           status = FinalApplicationStatus.SUCCEEDED
         } catch {
-          case e: InvocationTargetException => {
+          case e: InvocationTargetException =>
             e.getCause match {
-              case _: InterruptedException => {
+              case _: InterruptedException =>
                 // Reporter thread can interrupt to stop user class
-              }
+
+              case e => throw e
             }
-          }
         } finally {
           logDebug("Finishing main")
+          finalStatus = status
         }
-        finalStatus = status
       }
     }
     userClassThread.setName("Driver")

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 4870b0c..1cf19c1 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -348,7 +348,7 @@ private[spark] trait ClientBase extends Logging {
     }
 
     // For log4j configuration to reference
-    javaOpts += "-D=spark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
     val userClass =
       if (args.userClass != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index bbbf615..d7a7175 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -98,7 +98,7 @@ trait ExecutorRunnableUtil extends Logging {
     */
 
     // For log4j configuration to reference
-    javaOpts += "-D=spark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
 
     val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
       "-server",

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 815a736..8a7035c 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -140,7 +140,6 @@
         <configuration>
           <environmentVariables>
             <SPARK_HOME>${basedir}/../..</SPARK_HOME>
-            <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
           </environmentVariables>
         </configuration>
       </plugin>
@@ -148,7 +147,7 @@
 
     <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
     <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  
+
     <resources>
       <resource>
         <directory>../common/src/main/resources</directory>

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/stable/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
index fd934b7..97eb054 100644
--- a/yarn/stable/pom.xml
+++ b/yarn/stable/pom.xml
@@ -32,4 +32,13 @@
   <packaging>jar</packaging>
   <name>Spark Project YARN Stable API</name>
 
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/stable/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/stable/src/test/resources/log4j.properties b/yarn/stable/src/test/resources/log4j.properties
new file mode 100644
index 0000000..26b73a1
--- /dev/null
+++ b/yarn/stable/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/b8487713/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
new file mode 100644
index 0000000..857a444
--- /dev/null
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
+
+class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
+
+  // log4j configuration for the Yarn containers, so that their output is collected
+  // by Yarn instead of trying to overwrite unit-tests.log.
+  private val LOG4J_CONF = """
+    |log4j.rootCategory=DEBUG, console
+    |log4j.appender.console=org.apache.log4j.ConsoleAppender
+    |log4j.appender.console.target=System.err
+    |log4j.appender.console.layout=org.apache.log4j.PatternLayout
+    |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+    """.stripMargin
+
+  private var yarnCluster: MiniYARNCluster = _
+  private var tempDir: File = _
+  private var fakeSparkJar: File = _
+  private var oldConf: Map[String, String] = _
+
+  override def beforeAll() {
+    tempDir = Utils.createTempDir()
+
+    val logConfDir = new File(tempDir, "log4j")
+    logConfDir.mkdir()
+
+    val logConfFile = new File(logConfDir, "log4j.properties")
+    Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
+
+    val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
+      sys.props("java.class.path")
+
+    oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
+
+    yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
+    yarnCluster.init(new YarnConfiguration())
+    yarnCluster.start()
+    yarnCluster.getConfig().foreach { e =>
+      sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
+    }
+
+    fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+    sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
+    sys.props += ("spark.executor.instances" -> "1")
+    sys.props += ("spark.driver.extraClassPath" -> childClasspath)
+    sys.props += ("spark.executor.extraClassPath" -> childClasspath)
+
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    yarnCluster.stop()
+    sys.props.retain { case (k, v) => !k.startsWith("spark.") }
+    sys.props ++= oldConf
+    super.afterAll()
+  }
+
+  test("run Spark in yarn-client mode") {
+    var result = File.createTempFile("result", null, tempDir)
+    YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
+    checkResult(result)
+  }
+
+  test("run Spark in yarn-cluster mode") {
+    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
+    var result = File.createTempFile("result", null, tempDir)
+
+    // The Client object will call System.exit() after the job is done, and we don't want
+    // that because it messes up the scalatest monitoring. So replicate some of what main()
+    // does here.
+    val args = Array("--class", main,
+      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
+      "--arg", "yarn-cluster",
+      "--arg", result.getAbsolutePath(),
+      "--num-executors", "1")
+    val sparkConf = new SparkConf()
+    val yarnConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+    val clientArgs = new ClientArguments(args, sparkConf)
+    new Client(clientArgs, yarnConf, sparkConf).run()
+    checkResult(result)
+  }
+
+  /**
+   * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
+   * any sort of error when the job process finishes successfully, but the job itself fails. So
+   * the tests enforce that something is written to a file after everything is ok to indicate
+   * that the job succeeded.
+   */
+  private def checkResult(result: File) = {
+    var resultString = Files.toString(result, Charsets.UTF_8)
+    resultString should be ("success")
+  }
+
+}
+
+private object YarnClusterDriver extends Logging with Matchers {
+
+  def main(args: Array[String]) = {
+    if (args.length != 2) {
+      System.err.println(
+        s"""
+        |Invalid command line: ${args.mkString(" ")}
+        |
+        |Usage: YarnClusterDriver [master] [result file]
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    val sc = new SparkContext(new SparkConf().setMaster(args(0))
+      .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+    val status = new File(args(1))
+    var result = "failure"
+    try {
+      val data = sc.parallelize(1 to 4, 4).collect().toSet
+      data should be (Set(1, 2, 3, 4))
+      result = "success"
+    } finally {
+      sc.stop()
+      Files.write(result, status, Charsets.UTF_8)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org