You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/12/27 07:01:45 UTC

[kyuubi] branch master updated: [KYUUBI #4026] [PYSPARK] Fail if the session python worker process has been exited

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a3e201a [KYUUBI #4026] [PYSPARK] Fail if the session python worker process has been exited
a5a3e201a is described below

commit a5a3e201a79f5d7aeb684a51d452bf47bc7217a3
Author: fwang12 <fw...@ebay.com>
AuthorDate: Tue Dec 27 15:01:36 2022 +0800

    [KYUUBI #4026] [PYSPARK] Fail if the session python worker process has been exited
    
    ### _Why are the changes needed?_
    
    Before, if the pyspark environment is not set up correctly,the python response was always `None`.
    In this pr, fail if the session python worker process has been exited.
    
    BTW: Filter the empty log.
    <img width="1422" alt="image" src="https://user-images.githubusercontent.com/6757692/209502683-49aa9088-8686-4a54-b88c-85881a3fb089.png">
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4026 from turboFei/python_exec.
    
    Closes #4026
    
    499e19b54 [fwang12] more insights
    17cefc02e [fwang12] Fail if the session python worker has been exited
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../org/apache/kyuubi/engine/spark/operation/ExecutePython.scala    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index a27a8a023..06d1e19a2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -197,6 +197,10 @@ case class SessionPythonWorker(
    * @return the python response
    */
   def runCode(code: String, internal: Boolean = false): Option[PythonResponse] = withLockRequired {
+    if (!workerProcess.isAlive) {
+      throw KyuubiSQLException("Python worker process has been exited, please check the error log" +
+        " and re-create the session to run python code.")
+    }
     val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code"))
     // scalastyle:off println
     stdin.println(input)
@@ -337,7 +341,7 @@ object ExecutePython extends Logging {
     val stderrThread = new Thread("process stderr thread") {
       override def run(): Unit = {
         val lines = scala.io.Source.fromInputStream(process.getErrorStream).getLines()
-        lines.foreach(logger.error)
+        lines.filter(_.trim.nonEmpty).foreach(logger.error)
       }
     }
     stderrThread.setDaemon(true)