You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Or (JIRA)" <ji...@apache.org> on 2015/04/14 23:02:58 UTC

[jira] [Closed] (SPARK-6769) Usage of the ListenerBus in YarnClusterSuite is wrong

     [ https://issues.apache.org/jira/browse/SPARK-6769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Or closed SPARK-6769.
----------------------------
       Resolution: Fixed
    Fix Version/s: 1.4.0
         Assignee: Kousuke Saruta

> Usage of the ListenerBus in YarnClusterSuite is wrong
> -----------------------------------------------------
>
>                 Key: SPARK-6769
>                 URL: https://issues.apache.org/jira/browse/SPARK-6769
>             Project: Spark
>          Issue Type: Bug
>          Components: Tests, YARN
>    Affects Versions: 1.4.0
>            Reporter: Kousuke Saruta
>            Assignee: Kousuke Saruta
>            Priority: Minor
>             Fix For: 1.4.0
>
>
> In YarnClusterSuite, a test case uses `SaveExecutorInfo`  to handle ExecutorAddedEvent as follows.
> {code}
> private class SaveExecutorInfo extends SparkListener {
>   val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
>   override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
>     addedExecutorInfos(executor.executorId) = executor.executorInfo
>   }
> }
> ...
>     listener = new SaveExecutorInfo
>     val sc = new SparkContext(new SparkConf()
>       .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
>     sc.addSparkListener(listener)
>     val status = new File(args(0))
>     var result = "failure"
>     try {
>       val data = sc.parallelize(1 to 4, 4).collect().toSet
>       assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
>       data should be (Set(1, 2, 3, 4))
>       result = "success"
>     } finally {
>       sc.stop()
>       Files.write(result, status, UTF_8)
>     }
> {code}
> But, the usage is wrong because Executors will spawn during initializing SparkContext and SparkContext#addSparkListener should be invoked after the initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle ExecutorAddedEvent.
> Following code refers the result of the handling ExecutorAddedEvent. Because of the reason above, we cannot reach the assertion. 
> {code}
>     // verify log urls are present
>     listener.addedExecutorInfos.values.foreach { info =>
>       assert(info.logUrlMap.nonEmpty)
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org