You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jt...@apache.org on 2019/09/05 21:28:21 UTC

[spark] branch master updated: [SPARK-26046][SS] Add StreamingQueryManager.listListeners()

This is an automated email from the ASF dual-hosted git repository.

jtorres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3929d16  [SPARK-26046][SS] Add StreamingQueryManager.listListeners()
3929d16 is described below

commit 3929d166043deb104dc3f3180ab43be54c50937d
Author: Mukul Murthy <mu...@gmail.com>
AuthorDate: Thu Sep 5 14:27:54 2019 -0700

    [SPARK-26046][SS] Add StreamingQueryManager.listListeners()
    
    ### What changes were proposed in this pull request?
    
    Add a listListeners() method to StreamingQueryManager that lists all StreamingQueryListeners that have been added to that manager.
    
    ### Why are the changes needed?
    
    While it's best practice to keep handles on all listeners added, it's still nice to have an API to be able to list what listeners have been added to a StreamingQueryManager.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Modified existing unit tests to use the new API instead of using reflection.
    
    Closes #25518 from mukulmurthy/26046-listener.
    
    Authored-by: Mukul Murthy <mu...@gmail.com>
    Signed-off-by: Jose Torres <to...@gmail.com>
---
 .../spark/sql/streaming/StreamingQueryManager.scala       | 10 ++++++++++
 .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 15 ++++-----------
 2 files changed, 14 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index abee5f6..9765956 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
@@ -199,6 +200,15 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     listenerBus.removeListener(listener)
   }
 
+  /**
+   * List all [[StreamingQueryListener]]s attached to this [[StreamingQueryManager]].
+   *
+   * @since 3.0.0
+   */
+  def listListeners(): Array[StreamingQueryListener] = {
+    listenerBus.listeners.asScala.toArray
+  }
+
   /** Post a listener event */
   private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = {
     listenerBus.post(event)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 422223b..d964048 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -47,7 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   after {
     spark.streams.active.foreach(_.stop())
     assert(spark.streams.active.isEmpty)
-    assert(addedListeners().isEmpty)
+    assert(spark.streams.listListeners().isEmpty)
     // Make sure we don't leak any events to the next test
     spark.sparkContext.listenerBus.waitUntilEmpty(10000)
   }
@@ -223,7 +223,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
       assert(isListenerActive(listener1) === false)
       assert(isListenerActive(listener2))
     } finally {
-      addedListeners().foreach(spark.streams.removeListener)
+      spark.streams.listListeners().foreach(spark.streams.removeListener)
     }
   }
 
@@ -362,10 +362,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     assert(session1.streams.ne(session2.streams))
 
     withListenerAdded(collector1, session1) {
-      assert(addedListeners(session1).nonEmpty)
+      assert(session1.streams.listListeners().nonEmpty)
 
       withListenerAdded(collector2, session2) {
-        assert(addedListeners(session2).nonEmpty)
+        assert(session2.streams.listListeners().nonEmpty)
 
         // query on session1 should send events only to collector1
         runQuery(session1)
@@ -440,13 +440,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  private def addedListeners(session: SparkSession = spark): Array[StreamingQueryListener] = {
-    val listenerBusMethod =
-      PrivateMethod[StreamingQueryListenerBus]('listenerBus)
-    val listenerBus = session.streams invokePrivate listenerBusMethod()
-    listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
-  }
-
   /** Collects events from the StreamingQueryListener for testing */
   class EventCollector extends StreamingQueryListener {
     // to catch errors in the async listener events


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org