You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/07/27 05:26:09 UTC

[spark] branch master updated: [SPARK-39864][SQL] Lazily register ExecutionListenerBus

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 869fc2198a4 [SPARK-39864][SQL] Lazily register ExecutionListenerBus
869fc2198a4 is described below

commit 869fc2198a4bb51bc03dce36fb2b61a57fe3006e
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Wed Jul 27 14:25:54 2022 +0900

    [SPARK-39864][SQL] Lazily register ExecutionListenerBus
    
    ### What changes were proposed in this pull request?
    
    This PR modifies `ExecutionListenerManager` so that its `ExecutionListenerBus` SparkListener is lazily registered during the first `.register(QueryExceutionListener)` (instead of eagerly registering it in the constructor).
    
    ### Why are the changes needed?
    
    This addresses a ListenerBus performance problem in applications with large numbers of short-lived SparkSessions.
    
    The `ExecutionListenerBus` SparkListener is unregistered by the ContextCleaner after its associated ExecutionListenerManager/SparkSession is garbage-collected (see #31839). If many sessions are rapidly created and destroyed but the driver GC doesn't run then this can result in large number of unused ExecutionListenerBus listeners being registered on the shared ListenerBus queue. This can cause performance problems in the ListenerBus because each listener invocation has some overhead.
    
    In one real-world application with a very large driver heap and high rate of SparkSession creation (hundreds per minute), I saw 5000 idle ExecutionListenerBus listeners, resulting in ~50ms median event processing times on the shared listener queue.
    
    This patch avoids this problem by making the listener registration lazy: if a short-lived SparkSession never uses QueryExecutionListeners then we won't register the ExecutionListenerBus and won't incur these overheads.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a new unit test.
    
    Closes #37282 from JoshRosen/SPARK-39864.
    
    Authored-by: Josh Rosen <jo...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/util/QueryExecutionListener.scala      | 20 ++++++++++++++------
 .../sql/util/ExecutionListenerManagerSuite.scala     | 15 +++++++++++++++
 2 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 7ac06a5cd7e..45482f12f3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -81,7 +81,10 @@ class ExecutionListenerManager private[sql](
     loadExtensions: Boolean)
   extends Logging {
 
-  private val listenerBus = new ExecutionListenerBus(this, session)
+  // SPARK-39864: lazily create the listener bus on the first register() call in order to
+  // avoid listener overheads when QueryExecutionListeners aren't used:
+  private val listenerBusInitializationLock = new Object()
+  @volatile private var listenerBus: Option[ExecutionListenerBus] = None
 
   if (loadExtensions) {
     val conf = session.sparkContext.conf
@@ -97,7 +100,12 @@ class ExecutionListenerManager private[sql](
    */
   @DeveloperApi
   def register(listener: QueryExecutionListener): Unit = {
-    listenerBus.addListener(listener)
+    listenerBusInitializationLock.synchronized {
+      if (listenerBus.isEmpty) {
+        listenerBus = Some(new ExecutionListenerBus(this, session))
+      }
+    }
+    listenerBus.get.addListener(listener)
   }
 
   /**
@@ -105,7 +113,7 @@ class ExecutionListenerManager private[sql](
    */
   @DeveloperApi
   def unregister(listener: QueryExecutionListener): Unit = {
-    listenerBus.removeListener(listener)
+    listenerBus.foreach(_.removeListener(listener))
   }
 
   /**
@@ -113,12 +121,12 @@ class ExecutionListenerManager private[sql](
    */
   @DeveloperApi
   def clear(): Unit = {
-    listenerBus.removeAllListeners()
+    listenerBus.foreach(_.removeAllListeners())
   }
 
   /** Only exposed for testing. */
   private[sql] def listListeners(): Array[QueryExecutionListener] = {
-    listenerBus.listeners.asScala.toArray
+    listenerBus.map(_.listeners.asScala.toArray).getOrElse(Array.empty[QueryExecutionListener])
   }
 
   /**
@@ -127,7 +135,7 @@ class ExecutionListenerManager private[sql](
   private[sql] def clone(session: SparkSession, sqlConf: SQLConf): ExecutionListenerManager = {
     val newListenerManager =
       new ExecutionListenerManager(session, sqlConf, loadExtensions = false)
-    listenerBus.listeners.asScala.foreach(newListenerManager.register)
+    listenerBus.foreach(_.listeners.asScala.foreach(newListenerManager.register))
     newListenerManager
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
index 2ab733eac0b..56219766f70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
@@ -69,6 +69,21 @@ class ExecutionListenerManagerSuite extends SparkFunSuite with LocalSparkSession
     assert(INSTANCE_COUNT.get() === 1)
     assert(CALLBACK_COUNT.get() === 2)
   }
+
+  test("SPARK-39864 ExecutionListenerBus is lazily registered") {
+    spark = SparkSession.builder().master("local").appName("test").getOrCreate()
+    // Run a query to trigger the lazy initialization of the session state:
+    spark.sql("select 1").collect()
+    // The ExecutionListenerBus shouldn't be registered since no QueryExecutionListeners
+    // are registered:
+    assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().isEmpty)
+    // Registering the first query execution listener registers a listener bus:
+    spark.listenerManager.register(new CountingQueryExecutionListener)
+    assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().size == 1)
+    // Registering additional listeners reuses the same listener bus:
+    spark.listenerManager.register(new CountingQueryExecutionListener)
+    assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().size == 1)
+  }
 }
 
 private class CountingQueryExecutionListener extends QueryExecutionListener {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org