You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/04 00:37:12 UTC
[spark] branch branch-3.5 updated: [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new fb5495f64ff [SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires
fb5495f64ff is described below
commit fb5495f64ffdedf3006a6f1a66cda128e164ad32
Author: Wei Liu <we...@databricks.com>
AuthorDate: Mon Sep 4 09:36:49 2023 +0900
[SPARK-45061][SS][CONNECT] Clean up Running python StreamingQueryLIstener processes when session expires
### What changes were proposed in this pull request?
Clean up all running python StreamingQueryLIstener processes when session expires
### Why are the changes needed?
Improvement
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test will be added in SPARK-44462. Currently there is no way to test this because the session will never expire. This is because the started python listener process (on the server) will establish a connection with the server process with the same session id and ping it all the time.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42687 from WweiL/SPARK-44433-followup-listener-cleanup.
Authored-by: Wei Liu <we...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 7a01ba65b7408bc3b907aa7b0b27279913caafe9)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../sql/connect/planner/SparkConnectPlanner.scala | 4 +++-
.../spark/sql/connect/service/SessionHolder.scala | 21 ++++++++++++++++-----
2 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 46c465e4deb..2abbacc5a9b 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2902,7 +2902,9 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
SparkConnectService.streamingSessionManager.registerNewStreamingQuery(sessionHolder, query)
// Register the runner with the query if Python foreachBatch is enabled.
foreachBatchRunnerCleaner.foreach { cleaner =>
- sessionHolder.streamingRunnerCleanerCache.registerCleanerForQuery(query, cleaner)
+ sessionHolder.streamingForeachBatchRunnerCleanerCache.registerCleanerForQuery(
+ query,
+ cleaner)
}
executeHolder.eventsManager.postFinished()
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 2034a97fce9..1cef02d7e34 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -57,7 +57,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
new ConcurrentHashMap()
// Handles Python process clean up for streaming queries. Initialized on first use in a query.
- private[connect] lazy val streamingRunnerCleanerCache =
+ private[connect] lazy val streamingForeachBatchRunnerCleanerCache =
new StreamingForeachBatchHelper.CleanerCache(this)
/** Add ExecuteHolder to this session. Called only by SparkConnectExecutionManager. */
@@ -160,7 +160,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
eventManager.postClosed()
// Clean up running queries
SparkConnectService.streamingSessionManager.cleanupRunningQueries(this)
- streamingRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers.
+ streamingForeachBatchRunnerCleanerCache.cleanUpAll() // Clean up any streaming workers.
+ removeAllListeners() // removes all listener and stop python listener processes if necessary.
}
/**
@@ -237,11 +238,21 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
* Spark Connect PythonStreamingQueryListener.
*/
private[connect] def removeCachedListener(id: String): Unit = {
- listenerCache.get(id) match {
- case pyListener: PythonStreamingQueryListener => pyListener.stopListenerProcess()
+ Option(listenerCache.remove(id)) match {
+ case Some(pyListener: PythonStreamingQueryListener) => pyListener.stopListenerProcess()
case _ => // do nothing
}
- listenerCache.remove(id)
+ }
+
+ /**
+ * Stop all streaming listener threads, and removes all python process if applicable. Only
+ * called when session is expired.
+ */
+ private def removeAllListeners(): Unit = {
+ listenerCache.forEach((id, listener) => {
+ session.streams.removeListener(listener)
+ removeCachedListener(id)
+ })
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org