You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/31 10:57:17 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 1a7585580 [KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown
1a7585580 is described below

commit 1a75855808a36f69f4ee07c38fa7ac72ec52747e
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Fri Mar 31 18:56:52 2023 +0800

    [KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown
    
    ### _Why are the changes needed?_
    
    Due to the Py4JServer initiating with a non-daemon thread, there is a possibility of it impeding the engine's termination. Therefore, it is imperative to manually terminate the Py4JServer during engine shutdown.
    
    ```
    "Thread-23" #96 prio=5 os_prio=0 cpu=7.93ms elapsed=187532.67s tid=0x00007fee840cf000 nid=0x8f runnable  [0x00007fedca6bf000]
       java.lang.Thread.State: RUNNABLE
            at java.net.PlainSocketImpl.socketAccept(java.base11.0.16/Native Method)
            at java.net.AbstractPlainSocketImpl.accept(java.base11.0.16/Unknown Source)
            at java.net.ServerSocket.implAccept(java.base11.0.16/Unknown Source)
            at java.net.ServerSocket.accept(java.base11.0.16/Unknown Source)
            at py4j.GatewayServer.run(GatewayServer.java:685)
            at java.lang.Thread.run(java.base11.0.16/Unknown Source)
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4644 from cfmcgrady/pyserver-non-daemon.
    
    Closes #4644
    
    d4f1a57a6 [Fu Chen] synchronized
    cdc9630a7 [Fu Chen] shutdown Py4JServer
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 726a831c3ec8f9f5754e936211536160e21023ef)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/spark/session/SparkSQLSessionManager.scala       |  2 ++
 .../apache/spark/api/python/KyuubiPythonGatewayServer.scala | 13 +++++++++++--
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 677af9a03..79f38ce35 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.session
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.spark.api.python.KyuubiPythonGatewayServer
 import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.KyuubiSQLException
@@ -94,6 +95,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
 
   override def stop(): Unit = {
     super.stop()
+    KyuubiPythonGatewayServer.shutdown()
     userIsolatedSparkSessionThread.foreach(_.shutdown())
   }
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala
index 7e15ffe05..8cf8d685c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala
@@ -30,10 +30,12 @@ object KyuubiPythonGatewayServer extends Logging {
 
   val CONNECTION_FILE_PATH = Utils.createTempDir() + "/connection.info"
 
-  def start(): Unit = {
+  private var gatewayServer: Py4JServer = _
+
+  def start(): Unit = synchronized {
 
     val sparkConf = new SparkConf()
-    val gatewayServer: Py4JServer = new Py4JServer(sparkConf)
+    gatewayServer = new Py4JServer(sparkConf)
 
     gatewayServer.start()
     val boundPort: Int = gatewayServer.getListeningPort
@@ -65,4 +67,11 @@ object KyuubiPythonGatewayServer extends Logging {
       System.exit(1)
     }
   }
+
+  def shutdown(): Unit = synchronized {
+    if (gatewayServer != null) {
+      logInfo("shutting down the python gateway server.")
+      gatewayServer.shutdown()
+    }
+  }
 }