You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Or (JIRA)" <ji...@apache.org> on 2015/04/14 23:02:58 UTC
[jira] [Closed] (SPARK-6769) Usage of the ListenerBus in
YarnClusterSuite is wrong
[ https://issues.apache.org/jira/browse/SPARK-6769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Or closed SPARK-6769.
----------------------------
Resolution: Fixed
Fix Version/s: 1.4.0
Assignee: Kousuke Saruta
> Usage of the ListenerBus in YarnClusterSuite is wrong
> -----------------------------------------------------
>
> Key: SPARK-6769
> URL: https://issues.apache.org/jira/browse/SPARK-6769
> Project: Spark
> Issue Type: Bug
> Components: Tests, YARN
> Affects Versions: 1.4.0
> Reporter: Kousuke Saruta
> Assignee: Kousuke Saruta
> Priority: Minor
> Fix For: 1.4.0
>
>
> In YarnClusterSuite, a test case uses `SaveExecutorInfo` to handle ExecutorAddedEvent as follows.
> {code}
> private class SaveExecutorInfo extends SparkListener {
> val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
> override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
> addedExecutorInfos(executor.executorId) = executor.executorInfo
> }
> }
> ...
> listener = new SaveExecutorInfo
> val sc = new SparkContext(new SparkConf()
> .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
> sc.addSparkListener(listener)
> val status = new File(args(0))
> var result = "failure"
> try {
> val data = sc.parallelize(1 to 4, 4).collect().toSet
> assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
> data should be (Set(1, 2, 3, 4))
> result = "success"
> } finally {
> sc.stop()
> Files.write(result, status, UTF_8)
> }
> {code}
> But, the usage is wrong because Executors will spawn during initializing SparkContext and SparkContext#addSparkListener should be invoked after the initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle ExecutorAddedEvent.
> Following code refers the result of the handling ExecutorAddedEvent. Because of the reason above, we cannot reach the assertion.
> {code}
> // verify log urls are present
> listener.addedExecutorInfos.values.foreach { info =>
> assert(info.logUrlMap.nonEmpty)
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org