You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/03 20:36:39 UTC
git commit: [SPARK-2778] [yarn] Add workaround for race in
MiniYARNCluster.
Repository: spark
Updated Branches:
refs/heads/master 22f8e1ee7 -> fbe8e9856
[SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.
Sometimes the cluster's start() method returns before the configuration
having been updated, which is done by ClientRMService in, I assume, a
separate thread (otherwise there would be no race). That can cause tests
to fail if the old configuration data is read, since it will contain
the wrong RM address.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #2605 from vanzin/SPARK-2778 and squashes the following commits:
8d02ce0 [Marcelo Vanzin] Minor cleanup.
5bebee7 [Marcelo Vanzin] [SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe8e985
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe8e985
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe8e985
Branch: refs/heads/master
Commit: fbe8e9856b23262193105e7bf86075f516f0db25
Parents: 22f8e1e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Oct 3 11:36:24 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Oct 3 11:36:24 2014 -0700
----------------------------------------------------------------------
.../spark/deploy/yarn/YarnClusterSuite.scala | 35 +++++++++++++++++---
1 file changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fbe8e985/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 4b66356..a826b2a 100644
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import java.io.File
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
-class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
+class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
// log4j configuration for the Yarn containers, so that their output is collected
// by Yarn instead of trying to overwrite unit-tests.log.
@@ -66,7 +67,33 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration())
yarnCluster.start()
- yarnCluster.getConfig().foreach { e =>
+
+ // There's a race in MiniYARNCluster in which start() may return before the RM has updated
+ // its address in the configuration. You can see this in the logs by noticing that when
+ // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
+ // test works sometimes:
+ //
+ // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
+ //
+ // That log message prints the contents of the RM_ADDRESS config variable. If you check it
+ // later on, it looks something like this:
+ //
+ // INFO YarnClusterSuite: RM address in configuration is blah:42631
+ //
+ // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
+ // done so in a timely manner (defined to be 10 seconds).
+ val config = yarnCluster.getConfig()
+ val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
+ while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+ if (System.currentTimeMillis() > deadline) {
+ throw new IllegalStateException("Timed out waiting for RM to come up.")
+ }
+ logDebug("RM address still not set in configuration, waiting...")
+ TimeUnit.MILLISECONDS.sleep(100)
+ }
+
+ logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
+ config.foreach { e =>
sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
}
@@ -86,13 +113,13 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
super.afterAll()
}
- ignore("run Spark in yarn-client mode") {
+ test("run Spark in yarn-client mode") {
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
}
- ignore("run Spark in yarn-cluster mode") {
+ test("run Spark in yarn-cluster mode") {
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org