You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/14 23:01:59 UTC

spark git commit: [SPARK-6769][YARN][TEST] Usage of the ListenerBus in YarnClusterSuite is wrong

Repository: spark
Updated Branches:
  refs/heads/master 65774370a -> 4d4b24927


[SPARK-6769][YARN][TEST] Usage of the ListenerBus in YarnClusterSuite is wrong

In YarnClusterSuite, a test case uses `SaveExecutorInfo`  to handle ExecutorAddedEvent as follows.

```
private class SaveExecutorInfo extends SparkListener {
  val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()

  override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
    addedExecutorInfos(executor.executorId) = executor.executorInfo
  }
}

...

    listener = new SaveExecutorInfo
    val sc = new SparkContext(new SparkConf()
      .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
    sc.addSparkListener(listener)
    val status = new File(args(0))
    var result = "failure"
    try {
      val data = sc.parallelize(1 to 4, 4).collect().toSet
      assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
      data should be (Set(1, 2, 3, 4))
      result = "success"
    } finally {
      sc.stop()
      Files.write(result, status, UTF_8)
    }
```

But, the usage is wrong because Executors will spawn during initializing SparkContext and SparkContext#addSparkListener should be invoked after the initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle ExecutorAddedEvent.

Following code refers the result of the handling ExecutorAddedEvent. Because of the reason above, we cannot reach the assertion.

```
    // verify log urls are present
    listener.addedExecutorInfos.values.foreach { info =>
      assert(info.logUrlMap.nonEmpty)
    }
```

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #5417 from sarutak/SPARK-6769 and squashes the following commits:

8adc8ba [Kousuke Saruta] Fixed compile error
e258530 [Kousuke Saruta] Fixed style
591cf3e [Kousuke Saruta] Fixed style
48ec89a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769
860c965 [Kousuke Saruta] Simplified code
207d325 [Kousuke Saruta] Added findListenersByClass method to ListenerBus
2408c84 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769
2d7e409 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769
3874adf [Kousuke Saruta] Fixed the usage of listener bus in LogUrlsStandaloneSuite
153a91b [Kousuke Saruta] Fixed the usage of listener bus in YarnClusterSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d4b2492
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d4b2492
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d4b2492

Branch: refs/heads/master
Commit: 4d4b24927417b2c17810e94d6d46c37491c68869
Parents: 6577437
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Tue Apr 14 14:00:49 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Apr 14 14:01:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ListenerBus.scala     |  8 ++++++++
 .../spark/deploy/LogUrlsStandaloneSuite.scala   | 20 +++++++++++---------
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 17 ++++++++++-------
 3 files changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4d4b2492/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index d60b8b9..a725767 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -19,9 +19,12 @@ package org.apache.spark.util
 
 import java.util.concurrent.CopyOnWriteArrayList
 
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.spark.Logging
+import org.apache.spark.scheduler.SparkListener
 
 /**
  * An event bus which posts events to its listeners.
@@ -64,4 +67,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
    */
   def onPostEvent(listener: L, event: E): Unit
 
+  private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
+    val c = implicitly[ClassTag[T]].runtimeClass
+    listeners.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4d4b2492/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 9cdb428..c93d16f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
 
 import java.net.URL
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.io.Source
 
@@ -65,16 +66,17 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
         new MySparkConf().setAll(getAll)
       }
     }
-    val conf = new MySparkConf()
+    val conf = new MySparkConf().set(
+      "spark.extraListeners", classOf[SaveExecutorInfo].getName)
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
 
-    val listener = new SaveExecutorInfo
-    sc.addSparkListener(listener)
-
     // Trigger a job so that executors get added
     sc.parallelize(1 to 100, 4).map(_.toString).count()
 
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
+    assert(listeners.size === 1)
+    val listener = listeners(0)
     listener.addedExecutorInfos.values.foreach { info =>
       assert(info.logUrlMap.nonEmpty)
       info.logUrlMap.values.foreach { logUrl =>
@@ -82,12 +84,12 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
       }
     }
   }
+}
 
-  private class SaveExecutorInfo extends SparkListener {
-    val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+private[spark] class SaveExecutorInfo extends SparkListener {
+  val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
 
-    override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
-      addedExecutorInfos(executor.executorId) = executor.executorInfo
-    }
+  override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+    addedExecutorInfos(executor.executorId) = executor.executorInfo
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4d4b2492/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 76952e3..a18c94d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
+import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
 import org.apache.spark.util.Utils
 
 /**
@@ -282,10 +282,10 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
 
 }
 
-private class SaveExecutorInfo extends SparkListener {
+private[spark] class SaveExecutorInfo extends SparkListener {
   val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
 
-  override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+  override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
     addedExecutorInfos(executor.executorId) = executor.executorInfo
   }
 }
@@ -293,7 +293,6 @@ private class SaveExecutorInfo extends SparkListener {
 private object YarnClusterDriver extends Logging with Matchers {
 
   val WAIT_TIMEOUT_MILLIS = 10000
-  var listener: SaveExecutorInfo = null
 
   def main(args: Array[String]): Unit = {
     if (args.length != 1) {
@@ -306,10 +305,9 @@ private object YarnClusterDriver extends Logging with Matchers {
       System.exit(1)
     }
 
-    listener = new SaveExecutorInfo
     val sc = new SparkContext(new SparkConf()
+      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
       .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
-    sc.addSparkListener(listener)
     val status = new File(args(0))
     var result = "failure"
     try {
@@ -323,7 +321,12 @@ private object YarnClusterDriver extends Logging with Matchers {
     }
 
     // verify log urls are present
-    listener.addedExecutorInfos.values.foreach { info =>
+    val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
+    assert(listeners.size === 1)
+    val listener = listeners(0)
+    val executorInfos = listener.addedExecutorInfos.values
+    assert(executorInfos.nonEmpty)
+    executorInfos.foreach { info =>
       assert(info.logUrlMap.nonEmpty)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org