You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/08/18 07:23:39 UTC

[GitHub] [spark] rangadi opened a new pull request, #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

rangadi opened a new pull request, #42555:
URL: https://github.com/apache/spark/pull/42555

   [This is 3.5x port of #42460 in master. It resolves couple of conflicts. ]
   
   This terminates Python worker created for `foreachBatch` when the streaming query terminate. All of the tracking is done inside connect server (inside `StreamingForeachBatchHelper`). How this works:
   
   * (A) The helper class returns a cleaner (an `AutoCloseable`) to connect server when foreachBatch function is set up (happens before starting the query).
   * (B) If the query fails to start, server directly invokes the cleaner.
   * (C) If the query starts up, the server registers the cleaner with `streamingRunnerCleanerCache` in the `SessionHolder`.
   * (D) The cache keeps a mapping of query to cleaner.
   * It registers a streaming listener (only once per session), which invokes the cleaner when a query terminates.
   * There is also finally cleanup when SessionHolder expires.
   
   This ensures Python process created for a streaming query is properly terminated when a query terminates.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   - Unit tests are added for `CleanerCache`
   - Existing unit tests for foreachBatch.
   - Manual test to verify python process is terminated in different cases.
   - Unit tests don't really verify that the process is terminated. There will be a follow up PR to verify this.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #42555:
URL: https://github.com/apache/spark/pull/42555#issuecomment-1684070000

   The failures are known flakes with tests in `sql`. We can merge this. 4.x PR is already merge.
   <img width="1662" alt="image" src="https://github.com/apache/spark/assets/502522/4109b08e-6129-43c0-b907-11c3d7f1e992">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] melihsozdinler commented on a diff in pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

Posted by "melihsozdinler (via GitHub)" <gi...@apache.org>.
melihsozdinler commented on code in PR #42555:
URL: https://github.com/apache/spark/pull/42555#discussion_r1298159023


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2872,16 +2879,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.
+          logInfo(s"Removing foreachBatch worker, query failed to start for session $sessionId.")

Review Comment:
   Severtiy can be warning or error level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang closed pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates
URL: https://github.com/apache/spark/pull/42555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #42555:
URL: https://github.com/apache/spark/pull/42555#issuecomment-1684220522

   @gengliangwang please merge this into 3.5x 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #42555:
URL: https://github.com/apache/spark/pull/42555#issuecomment-1684232655

   Merged to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42555: [SPARK-44433][3.5X] Terminate foreach batch runner when streaming query terminates

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42555:
URL: https://github.com/apache/spark/pull/42555#discussion_r1298566448


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2872,16 +2879,26 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       writer.foreachBatch(foreachBatchFn)
     }
 
-    val query = writeOp.getPath match {
-      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
-      case "" => writer.start()
-      case path => writer.start(path)
-    }
+    val query =
+      try {
+        writeOp.getPath match {
+          case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+          case "" => writer.start()
+          case path => writer.start(path)
+        }
+      } catch {
+        case NonFatal(ex) => // Failed to start the query, clean up foreach runner if any.
+          logInfo(s"Removing foreachBatch worker, query failed to start for session $sessionId.")

Review Comment:
   Depends. This is not a log about failure for the query. It is only about cleaning up a resource. It does not log the exception. Actual log for the query failure would be at another place. 
   It is common for queries can fail to start (e.g. user config error).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org