You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/08/11 21:00:26 UTC

[GitHub] [spark] rangadi opened a new pull request, #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

rangadi opened a new pull request, #42460:
URL: https://github.com/apache/spark/pull/42460

   ### What changes were proposed in this pull request?
   
   This terminates Python worker created for `foreachBatch` when the streaming query terminate. 
   All of the tracking is done inside connect server (inside `StreamingForeachBatchHelper`). How this works:
   
   * (A) The helper class returns a cleaner (an `AutoCloseable`) to connect server when foreachBatch function is set up (happens before starting the query).
   * (B) If the query fails to start, server directly invokes the cleaner.
   * (C) If the query starts up, the server registers the cleaner with `streamingRunnerCleanerCache` in the `SessionHolder`.
   * (D) The cache keeps a mapping of query to cleaner.
   * It registers a streaming listener (only once per session), which invokes the cleaner when a query terminates. 
   * There is also finally cleanup when SessionHolder expires. 
   
   ### Why are the changes needed?
   This ensures Python process created for a streaming query is properly terminated when a query terminates. 
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   - Unit tests are added for `CleanerCache`
   - Existing unit tests for foreachBatch.
   - Manual test to verify python process is terminated (How ca we test this in a unit test?)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #42460: [SPARK-44433][3.5x] Terminate foreach batch runner when streaming query terminates

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #42460: [SPARK-44433][3.5x] Terminate foreach batch runner when streaming query terminates
URL: https://github.com/apache/spark/pull/42460


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1296263054


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -102,6 +118,9 @@ object StreamingForeachBatchHelper extends Logging {
       //     This is because MicroBatch execution clones the session during start.
       //     The session attached to the foreachBatch dataframe is different from the one the one
       //     the query was started with. `sessionHolder` here contains the latter.
+      //     Another issue with not creating new session id: foreachBatch worker keeps
+      //     the session alive. The session mapping at Connect server does not expire and query
+      //     keeps running even if the original client disappears. This keeps the query running.

Review Comment:
   FYI: @juliuszsompolski, @WweiL, @bogao007
   Will do this in next follow up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293777160


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2850,16 +2857,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.

Review Comment:
   Can you explain why do we only catch `NonFatal` here?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2850,16 +2857,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.
+          foreachBatchRunnerCleaner.foreach(_.close())
+          throw ex
+      }
 
     // Register the new query so that the session and query references are cached.
-    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
-      sessionHolder = SessionHolder(userId = userId, sessionId = sessionId, session),
-      query = query)
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(sessionHolder, query)
+    // Register the runner with the query if Python foreachBatch is enabled.
+    foreachBatchRunnerCleaner.foreach { cleaner =>
+      sessionHolder.streamingRunnerCleanerCache.registerCleanerForQuery(query, cleaner)
+    }
+    // Register the new query so that the session and query references are cached.

Review Comment:
   I think this comment needs to be deleted



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -153,7 +158,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
     logDebug(s"Expiring session with userId: $userId and sessionId: $sessionId")
     artifactManager.cleanUpResources()
     eventManager.postClosed()
-
+    streamingRunnerCleanerCache.cleanUpAll()

Review Comment:
   I think it's better to put this below 
   ```
   SparkConnectService.streamingSessionManager.cleanupRunningQueries(this)
   ```
   so we first stop the queries, then stop the python processes.
   
   That way in case there are still queries running, it would prevent a lot of errors thrown. I think it's better even if the errors are caught anyways



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -31,6 +38,17 @@ object StreamingForeachBatchHelper extends Logging {
 
   type ForeachBatchFnType = (DataFrame, Long) => Unit
 
+  case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable {
+    override def close(): Unit = {
+      try runner.stop()
+      catch {
+        case NonFatal(ex) =>
+          logWarning("Error while stopping streaming Python worker", ex)
+        // Exception is not propagated.

Review Comment:
   indentation?
   
   I'm not sure what "exception is not propagated" means, but [PythonRunner also just logs this](https://github.com/apache/spark/blob/bf1dc9fd650747baa6abf16b5f7c13652362556d/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L214-L221)
   
   Also may I know if there is any specific reason we only catch nonFatal errors?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -180,13 +185,16 @@ private[connect] class SparkConnectStreamingQueryCache(
           case Some(_) => // Inactive query waiting for expiration. Do nothing.
             logInfo(s"Waiting for the expiration for $id in session ${v.sessionId}")
 
-          case None => // Active query, check if it is stopped. Keep the session alive.
+          case None => // Active query, check if it is stopped. Enable timeout if it is stopped.
             val isActive = v.query.isActive && Option(v.session.streams.get(id)).nonEmpty
 
             if (!isActive) {
               logInfo(s"Marking query $id in session ${v.sessionId} inactive.")
               val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis
               queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs)))
+              // To consider: Clean up any runner registered for this query with the session holder

Review Comment:
   Should we file a ticket for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293994248


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -111,17 +128,79 @@ object StreamingForeachBatchHelper extends Logging {
       logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
     }
 
-    dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder)
+    (dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder), RunnerCleaner(runner))
   }
 
-  // TODO(SPARK-44433): Improve termination of Processes
-  //   The goal is that when a query is terminated, the python process associated with foreachBatch
-  //   should be terminated. One way to do that is by registering streaming query listener:
-  //   After pythonForeachBatchWrapper() is invoked by the SparkConnectPlanner.
-  //   At that time, we don't have the streaming queries yet.
-  //   Planner should call back into this helper with the query id when it starts it immediately
-  //   after. Save the query id to StreamingPythonRunner mapping. This mapping should be
-  //   part of the SessionHolder.
-  //   When a query is terminated, check the mapping and terminate any associated runner.
-  //   These runners should be terminated when a session is deleted (due to timeout, etc).
+  /**
+   * This manages cache from queries to cleaner for runners used for streaming queries. This is
+   * used in [[SessionHolder]].
+   */
+  class CleanerCache(sessionHolder: SessionHolder) {
+
+    private case class CacheKey(queryId: String, runId: String)
+
+    // Mapping from streaming (queryId, runId) to runner cleaner. Used for Python foreachBatch.
+    private val cleanerCache: ConcurrentMap[CacheKey, AutoCloseable] = new ConcurrentHashMap()
+
+    private lazy val streamingListener = { // Initialized on first registered query
+      val listener = new StreamingRunnerCleanerListener
+      sessionHolder.session.streams.addListener(listener)
+      logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}")
+      listener
+    }
+
+    private[connect] def registerCleanerForQuery(
+        query: StreamingQuery,
+        cleaner: AutoCloseable): Unit = {
+
+      streamingListener // Access to initialize
+      val key = CacheKey(query.id.toString, query.runId.toString)
+
+      Option(cleanerCache.putIfAbsent(key, cleaner)) match {
+        case Some(_) =>
+          throw new IllegalStateException(s"Unexpected: a cleaner for query $key is already set")

Review Comment:
   Ah OK, then it should be fine, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1296618039


##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -109,7 +109,10 @@ private[spark] class StreamingPythonRunner(
    */
   def stop(): Unit = {
     pythonWorker.foreach { worker =>
-      SparkEnv.get.destroyPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, worker)
+      pythonWorkerFactory.foreach { factory =>

Review Comment:
   PTAL @WweiL & @bogao007. I have manually tested. Even with the NPE issue that Bo mentioned, the process is getting removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1296652197


##########
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##########
@@ -36,7 +36,7 @@ import org.apache.spark.util.{RedirectThread, Utils}
 
 case class PythonWorker(channel: SocketChannel, selector: Selector, selectionKey: SelectionKey) {
   def stop(): Unit = {
-    selectionKey.cancel()
+    Option(selectionKey).foreach(_.cancel())

Review Comment:
   @bogao007 this fixes NPE.
   cc: @utkarsh39 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293924533


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -153,7 +158,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
     logDebug(s"Expiring session with userId: $userId and sessionId: $sessionId")
     artifactManager.cleanUpResources()
     eventManager.postClosed()
-
+    streamingRunnerCleanerCache.cleanUpAll()

Review Comment:
   +1



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -111,17 +128,79 @@ object StreamingForeachBatchHelper extends Logging {
       logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
     }
 
-    dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder)
+    (dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder), RunnerCleaner(runner))
   }
 
-  // TODO(SPARK-44433): Improve termination of Processes
-  //   The goal is that when a query is terminated, the python process associated with foreachBatch
-  //   should be terminated. One way to do that is by registering streaming query listener:
-  //   After pythonForeachBatchWrapper() is invoked by the SparkConnectPlanner.
-  //   At that time, we don't have the streaming queries yet.
-  //   Planner should call back into this helper with the query id when it starts it immediately
-  //   after. Save the query id to StreamingPythonRunner mapping. This mapping should be
-  //   part of the SessionHolder.
-  //   When a query is terminated, check the mapping and terminate any associated runner.
-  //   These runners should be terminated when a session is deleted (due to timeout, etc).
+  /**
+   * This manages cache from queries to cleaner for runners used for streaming queries. This is
+   * used in [[SessionHolder]].
+   */
+  class CleanerCache(sessionHolder: SessionHolder) {
+
+    private case class CacheKey(queryId: String, runId: String)
+
+    // Mapping from streaming (queryId, runId) to runner cleaner. Used for Python foreachBatch.
+    private val cleanerCache: ConcurrentMap[CacheKey, AutoCloseable] = new ConcurrentHashMap()
+
+    private lazy val streamingListener = { // Initialized on first registered query
+      val listener = new StreamingRunnerCleanerListener
+      sessionHolder.session.streams.addListener(listener)
+      logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}")
+      listener
+    }
+
+    private[connect] def registerCleanerForQuery(
+        query: StreamingQuery,
+        cleaner: AutoCloseable): Unit = {
+
+      streamingListener // Access to initialize
+      val key = CacheKey(query.id.toString, query.runId.toString)
+
+      Option(cleanerCache.putIfAbsent(key, cleaner)) match {
+        case Some(_) =>
+          throw new IllegalStateException(s"Unexpected: a cleaner for query $key is already set")

Review Comment:
   Can we use the new error framework? Something like [this](https://github.com/apache/spark/blob/d4629563492ec3090b5bd5b924790507c42f4e86/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala#L329-L332) would be good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293963420


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2850,16 +2857,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.

Review Comment:
   That is usual best practice. The expectation is that any other error is a fatal error and is not good to catch. 
   One common error that is not caught by NonFatal is `OutOfMemoryError`.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2850,16 +2857,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.

Review Comment:
   See spark style guide: https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -31,6 +38,17 @@ object StreamingForeachBatchHelper extends Logging {
 
   type ForeachBatchFnType = (DataFrame, Long) => Unit
 
+  case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable {
+    override def close(): Unit = {
+      try runner.stop()
+      catch {
+        case NonFatal(ex) =>
+          logWarning("Error while stopping streaming Python worker", ex)
+        // Exception is not propagated.

Review Comment:
   The indentation is due to scalafmt. I will move it up so that it is aligned better. 
   
   `NonFatal` is a good practice to catch only application error rather than some JVM related fatal errors. This is important especially while swallowing the exceptions. See Spark codestyle guide: https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2850,16 +2857,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.
+          foreachBatchRunnerCleaner.foreach(_.close())
+          throw ex
+      }
 
     // Register the new query so that the session and query references are cached.
-    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
-      sessionHolder = SessionHolder(userId = userId, sessionId = sessionId, session),
-      query = query)
+    SparkConnectService.streamingSessionManager.registerNewStreamingQuery(sessionHolder, query)
+    // Register the runner with the query if Python foreachBatch is enabled.
+    foreachBatchRunnerCleaner.foreach { cleaner =>
+      sessionHolder.streamingRunnerCleanerCache.registerCleanerForQuery(query, cleaner)
+    }
+    // Register the new query so that the session and query references are cached.

Review Comment:
   Removed. Also updated the original comment.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala:
##########
@@ -180,13 +185,16 @@ private[connect] class SparkConnectStreamingQueryCache(
           case Some(_) => // Inactive query waiting for expiration. Do nothing.
             logInfo(s"Waiting for the expiration for $id in session ${v.sessionId}")
 
-          case None => // Active query, check if it is stopped. Keep the session alive.
+          case None => // Active query, check if it is stopped. Enable timeout if it is stopped.
             val isActive = v.query.isActive && Option(v.session.streams.get(id)).nonEmpty
 
             if (!isActive) {
               logInfo(s"Marking query $id in session ${v.sessionId} inactive.")
               val expiresAtMs = nowMs + stoppedQueryInactivityTimeout.toMillis
               queryCache.put(k, v.copy(expiresAtMs = Some(expiresAtMs)))
+              // To consider: Clean up any runner registered for this query with the session holder

Review Comment:
   Shall I do this in the PR? Testing might be tricky. Updated the comment to clarify that these would be cleaned up when the session expires 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -153,7 +158,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
     logDebug(s"Expiring session with userId: $userId and sessionId: $sessionId")
     artifactManager.cleanUpResources()
     eventManager.postClosed()
-
+    streamingRunnerCleanerCache.cleanUpAll()

Review Comment:
   Updated. I was thinking about which one should go first. stopping queries after stopping the queries is better.
   The streaming listener event might fire after the worker is stopped, that is ok. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -111,17 +128,79 @@ object StreamingForeachBatchHelper extends Logging {
       logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
     }
 
-    dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder)
+    (dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder), RunnerCleaner(runner))
   }
 
-  // TODO(SPARK-44433): Improve termination of Processes
-  //   The goal is that when a query is terminated, the python process associated with foreachBatch
-  //   should be terminated. One way to do that is by registering streaming query listener:
-  //   After pythonForeachBatchWrapper() is invoked by the SparkConnectPlanner.
-  //   At that time, we don't have the streaming queries yet.
-  //   Planner should call back into this helper with the query id when it starts it immediately
-  //   after. Save the query id to StreamingPythonRunner mapping. This mapping should be
-  //   part of the SessionHolder.
-  //   When a query is terminated, check the mapping and terminate any associated runner.
-  //   These runners should be terminated when a session is deleted (due to timeout, etc).
+  /**
+   * This manages cache from queries to cleaner for runners used for streaming queries. This is
+   * used in [[SessionHolder]].
+   */
+  class CleanerCache(sessionHolder: SessionHolder) {
+
+    private case class CacheKey(queryId: String, runId: String)
+
+    // Mapping from streaming (queryId, runId) to runner cleaner. Used for Python foreachBatch.
+    private val cleanerCache: ConcurrentMap[CacheKey, AutoCloseable] = new ConcurrentHashMap()
+
+    private lazy val streamingListener = { // Initialized on first registered query
+      val listener = new StreamingRunnerCleanerListener
+      sessionHolder.session.streams.addListener(listener)
+      logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}")
+      listener
+    }
+
+    private[connect] def registerCleanerForQuery(
+        query: StreamingQuery,
+        cleaner: AutoCloseable): Unit = {
+
+      streamingListener // Access to initialize
+      val key = CacheKey(query.id.toString, query.runId.toString)
+
+      Option(cleanerCache.putIfAbsent(key, cleaner)) match {
+        case Some(_) =>
+          throw new IllegalStateException(s"Unexpected: a cleaner for query $key is already set")

Review Comment:
   This is an internal check. Should never happen (unlike 'SESSION_NOT_FOUND' which can be triggered by user sending a stale session id).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #42460:
URL: https://github.com/apache/spark/pull/42460#issuecomment-1675398277

   cc: @WweiL @bogao007 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org