You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/03/31 10:57:17 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 1a7585580 [KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown
1a7585580 is described below
commit 1a75855808a36f69f4ee07c38fa7ac72ec52747e
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Fri Mar 31 18:56:52 2023 +0800
[KYUUBI #4644] Manually terminate the Py4JServer during engine shutdown
### _Why are the changes needed?_
Due to the Py4JServer initiating with a non-daemon thread, there is a possibility of it impeding the engine's termination. Therefore, it is imperative to manually terminate the Py4JServer during engine shutdown.
```
"Thread-23" #96 prio=5 os_prio=0 cpu=7.93ms elapsed=187532.67s tid=0x00007fee840cf000 nid=0x8f runnable [0x00007fedca6bf000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(java.base11.0.16/Native Method)
at java.net.AbstractPlainSocketImpl.accept(java.base11.0.16/Unknown Source)
at java.net.ServerSocket.implAccept(java.base11.0.16/Unknown Source)
at java.net.ServerSocket.accept(java.base11.0.16/Unknown Source)
at py4j.GatewayServer.run(GatewayServer.java:685)
at java.lang.Thread.run(java.base11.0.16/Unknown Source)
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4644 from cfmcgrady/pyserver-non-daemon.
Closes #4644
d4f1a57a6 [Fu Chen] synchronized
cdc9630a7 [Fu Chen] shutdown Py4JServer
Authored-by: Fu Chen <cf...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit 726a831c3ec8f9f5754e936211536160e21023ef)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/spark/session/SparkSQLSessionManager.scala | 2 ++
.../apache/spark/api/python/KyuubiPythonGatewayServer.scala | 13 +++++++++++--
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 677af9a03..79f38ce35 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.engine.spark.session
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
@@ -94,6 +95,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
override def stop(): Unit = {
super.stop()
+ KyuubiPythonGatewayServer.shutdown()
userIsolatedSparkSessionThread.foreach(_.shutdown())
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala
index 7e15ffe05..8cf8d685c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/api/python/KyuubiPythonGatewayServer.scala
@@ -30,10 +30,12 @@ object KyuubiPythonGatewayServer extends Logging {
val CONNECTION_FILE_PATH = Utils.createTempDir() + "/connection.info"
- def start(): Unit = {
+ private var gatewayServer: Py4JServer = _
+
+ def start(): Unit = synchronized {
val sparkConf = new SparkConf()
- val gatewayServer: Py4JServer = new Py4JServer(sparkConf)
+ gatewayServer = new Py4JServer(sparkConf)
gatewayServer.start()
val boundPort: Int = gatewayServer.getListeningPort
@@ -65,4 +67,11 @@ object KyuubiPythonGatewayServer extends Logging {
System.exit(1)
}
}
+
+ def shutdown(): Unit = synchronized {
+ if (gatewayServer != null) {
+ logInfo("shutting down the python gateway server.")
+ gatewayServer.shutdown()
+ }
+ }
}