You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/03 20:36:39 UTC

git commit: [SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.

Repository: spark
Updated Branches:
  refs/heads/master 22f8e1ee7 -> fbe8e9856


 [SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.

Sometimes the cluster's start() method returns before the configuration
having been updated, which is done by ClientRMService in, I assume, a
separate thread (otherwise there would be no race). That can cause tests
to fail if the old configuration data is read, since it will contain
the wrong RM address.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #2605 from vanzin/SPARK-2778 and squashes the following commits:

8d02ce0 [Marcelo Vanzin] Minor cleanup.
5bebee7 [Marcelo Vanzin] [SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe8e985
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe8e985
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe8e985

Branch: refs/heads/master
Commit: fbe8e9856b23262193105e7bf86075f516f0db25
Parents: 22f8e1e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Oct 3 11:36:24 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Oct 3 11:36:24 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 35 +++++++++++++++++---
 1 file changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fbe8e985/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 4b66356..a826b2a 100644
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
 
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
-class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
+class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
 
   // log4j configuration for the Yarn containers, so that their output is collected
   // by Yarn instead of trying to overwrite unit-tests.log.
@@ -66,7 +67,33 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
     yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
     yarnCluster.init(new YarnConfiguration())
     yarnCluster.start()
-    yarnCluster.getConfig().foreach { e =>
+
+    // There's a race in MiniYARNCluster in which start() may return before the RM has updated
+    // its address in the configuration. You can see this in the logs by noticing that when
+    // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
+    // test works sometimes:
+    //
+    //    INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
+    //
+    // That log message prints the contents of the RM_ADDRESS config variable. If you check it
+    // later on, it looks something like this:
+    //
+    //    INFO YarnClusterSuite: RM address in configuration is blah:42631
+    //
+    // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
+    // done so in a timely manner (defined to be 10 seconds).
+    val config = yarnCluster.getConfig()
+    val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
+    while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+      if (System.currentTimeMillis() > deadline) {
+        throw new IllegalStateException("Timed out waiting for RM to come up.")
+      }
+      logDebug("RM address still not set in configuration, waiting...")
+      TimeUnit.MILLISECONDS.sleep(100)
+    }
+
+    logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
+    config.foreach { e =>
       sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
     }
 
@@ -86,13 +113,13 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
     super.afterAll()
   }
 
-  ignore("run Spark in yarn-client mode") {
+  test("run Spark in yarn-client mode") {
     var result = File.createTempFile("result", null, tempDir)
     YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
     checkResult(result)
   }
 
-  ignore("run Spark in yarn-cluster mode") {
+  test("run Spark in yarn-cluster mode") {
     val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
     var result = File.createTempFile("result", null, tempDir)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org