You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "bogao007 (via GitHub)" <gi...@apache.org> on 2023/08/14 20:13:58 UTC

[GitHub] [spark] bogao007 commented on a diff in pull request #42460: [SPARK-44433] Terminate foreach batch runner when streaming query terminates

bogao007 commented on code in PR #42460:
URL: https://github.com/apache/spark/pull/42460#discussion_r1293924533


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -153,7 +158,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
     logDebug(s"Expiring session with userId: $userId and sessionId: $sessionId")
     artifactManager.cleanUpResources()
     eventManager.postClosed()
-
+    streamingRunnerCleanerCache.cleanUpAll()

Review Comment:
   +1



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -111,17 +128,79 @@ object StreamingForeachBatchHelper extends Logging {
       logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
     }
 
-    dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder)
+    (dataFrameCachingWrapper(foreachBatchRunnerFn, sessionHolder), RunnerCleaner(runner))
   }
 
-  // TODO(SPARK-44433): Improve termination of Processes
-  //   The goal is that when a query is terminated, the python process associated with foreachBatch
-  //   should be terminated. One way to do that is by registering streaming query listener:
-  //   After pythonForeachBatchWrapper() is invoked by the SparkConnectPlanner.
-  //   At that time, we don't have the streaming queries yet.
-  //   Planner should call back into this helper with the query id when it starts it immediately
-  //   after. Save the query id to StreamingPythonRunner mapping. This mapping should be
-  //   part of the SessionHolder.
-  //   When a query is terminated, check the mapping and terminate any associated runner.
-  //   These runners should be terminated when a session is deleted (due to timeout, etc).
+  /**
+   * This manages cache from queries to cleaner for runners used for streaming queries. This is
+   * used in [[SessionHolder]].
+   */
+  class CleanerCache(sessionHolder: SessionHolder) {
+
+    private case class CacheKey(queryId: String, runId: String)
+
+    // Mapping from streaming (queryId, runId) to runner cleaner. Used for Python foreachBatch.
+    private val cleanerCache: ConcurrentMap[CacheKey, AutoCloseable] = new ConcurrentHashMap()
+
+    private lazy val streamingListener = { // Initialized on first registered query
+      val listener = new StreamingRunnerCleanerListener
+      sessionHolder.session.streams.addListener(listener)
+      logInfo(s"Registered runner clean up listener for session ${sessionHolder.sessionId}")
+      listener
+    }
+
+    private[connect] def registerCleanerForQuery(
+        query: StreamingQuery,
+        cleaner: AutoCloseable): Unit = {
+
+      streamingListener // Access to initialize
+      val key = CacheKey(query.id.toString, query.runId.toString)
+
+      Option(cleanerCache.putIfAbsent(key, cleaner)) match {
+        case Some(_) =>
+          throw new IllegalStateException(s"Unexpected: a cleaner for query $key is already set")

Review Comment:
   Can we use the new error framework? Something like [this](https://github.com/apache/spark/blob/d4629563492ec3090b5bd5b924790507c42f4e86/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala#L329-L332) would be good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org