You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/04 00:37:12 UTC

[spark] branch branch-3.5 updated: [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new fb5495f64ff [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires
fb5495f64ff is described below

commit fb5495f64ffdedf3006a6f1a66cda128e164ad32
Author: Wei Liu <we...@databricks.com>
AuthorDate: Mon Sep 4 09:36:49 2023 +0900

    [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires
    
    ### What changes were proposed in this pull request?
    
    Clean up all running python StreamingQueryLIstener processes when session expires
    
    ### Why are the changes needed?
    
    Improvement
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test will be added in SPARK-44462. Currently there is no way to test this because the session will never expire. This is because the started python listener process (on the server) will establish a connection with the server process with the same session id and ping it all the time.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #42687 from WweiL/SPARK-44433-followup-listener-cleanup.
    
    Authored-by: Wei Liu <we...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 7a01ba65b7408bc3b907aa7b0b27279913caafe9)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../sql/connect/planner/SparkConnectPlanner.scala   |  4 +++-
 .../spark/sql/connect/service/SessionHolder.scala   | 21 ++++++++++++++++-----
 2 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 46c465e4deb..2abbacc5a9b 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2902,7 +2902,9 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
     SparkConnectService.streamingSessionManager.registerNewStreamingQuery(sessionHolder, query)
     // Register the runner with the query if Python foreachBatch is enabled.
     foreachBatchRunnerCleaner.foreach { cleaner =>
-      sessionHolder.streamingRunnerCleanerCache.registerCleanerForQuery(query, cleaner)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache.registerCleanerForQuery(
+        query,
+        cleaner)
     }
     executeHolder.eventsManager.postFinished()
 
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 2034a97fce9..1cef02d7e34 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -57,7 +57,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
     new ConcurrentHashMap()
 
   // Handles Python process clean up for streaming queries. Initialized on first use in a query.
-  private[connect] lazy val streamingRunnerCleanerCache =
+  private[connect] lazy val streamingForeachBatchRunnerCleanerCache =
     new StreamingForeachBatchHelper.CleanerCache(this)
 
   /** Add ExecuteHolder to this session. Called only by SparkConnectExecutionManager. */
@@ -160,7 +160,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
     eventManager.postClosed()
     // Clean up running queries
     SparkConnectService.streamingSessionManager.cleanupRunningQueries(this)
-    streamingRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers.
+    streamingForeachBatchRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers.
+    removeAllListeners() // removes all listener and stop python listener processes if necessary.
   }
 
   /**
@@ -237,11 +238,21 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
    * Spark Connect PythonStreamingQueryListener.
    */
   private[connect] def removeCachedListener(id: String): Unit = {
-    listenerCache.get(id) match {
-      case pyListener: PythonStreamingQueryListener => pyListener.stopListenerProcess()
+    Option(listenerCache.remove(id)) match {
+      case Some(pyListener: PythonStreamingQueryListener) => pyListener.stopListenerProcess()
       case _ => // do nothing
     }
-    listenerCache.remove(id)
+  }
+
+  /**
+   * Stop all streaming listener threads, and removes all python process if applicable. Only
+   * called when session is expired.
+   */
+  private def removeAllListeners(): Unit = {
+    listenerCache.forEach((id, listener) => {
+      session.streams.removeListener(listener)
+      removeCachedListener(id)
+    })
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org