You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/07 12:00:47 UTC
[linkis] branch dev-1.3.2 updated: fix python watch dog not work (#4295)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 6bb950224 fix python watch dog not work (#4295)
6bb950224 is described below
commit 6bb9502247f0e02ddba3c089504aa353a10f6168
Author: Casion <ca...@gmail.com>
AuthorDate: Tue Mar 7 20:00:39 2023 +0800
fix python watch dog not work (#4295)
---
.../org/apache/linkis/common/utils/Utils.scala | 21 +++++++++++++
.../python/src/main/resources/python/python.py | 16 +++++-----
.../python/executor/PythonSession.scala | 2 +-
.../spark/src/main/resources/python/mix_pyspark.py | 8 ++---
.../spark/config/SparkConfiguration.scala | 8 +++++
.../spark/executor/SparkPythonExecutor.scala | 35 +++++++++++++++++++---
6 files changed, 73 insertions(+), 17 deletions(-)
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
index 3d3bc636a..80e3ff7e5 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
@@ -28,6 +28,7 @@ import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import java.io.{BufferedReader, InputStreamReader}
+import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
@@ -362,4 +363,24 @@ object Utils extends Logging {
def getJvmUser: String = System.getProperty("user.name")
+ // Note: may fail in some JVM implementations
+ def getProcessId(): String = {
+ // therefore fallback has to be provided
+ // something like '<pid>@<hostname>', at least in SUN / Oracle JVMs
+ val jvmName = ManagementFactory.getRuntimeMXBean.getName
+ val index = jvmName.indexOf('@')
+ // part before '@' empty (index = 0) / '@' not found (index = -1)
+ if (index < 1) {
+ null
+ }
+ Utils.tryCatch {
+ val getpid = jvmName.substring(0, index)
+ logger.info(s"get java process Id:$getpid")
+ getpid
+ } { t =>
+ logger.info(s"Failed to get process Id with error", t.getMessage)
+ null
+ }
+ }
+
}
diff --git a/linkis-engineconn-plugins/python/src/main/resources/python/python.py b/linkis-engineconn-plugins/python/src/main/resources/python/python.py
index 1b040a2bb..15e4df2ab 100644
--- a/linkis-engineconn-plugins/python/src/main/resources/python/python.py
+++ b/linkis-engineconn-plugins/python/src/main/resources/python/python.py
@@ -213,21 +213,21 @@ def java_watchdog_thread(sleep=10):
while True :
time.sleep(sleep)
try:
- intp.kind()
+ intp.getKind()
except Exception as e:
- print("Failed to detect java daemon, now exit python process")
- print(e)
- sys.exit(1)
+ # Failed to detect java daemon, now exit python process
+ #just exit thread see https://stackoverflow.com/questions/905189/why-does-sys-exit-not-exit-when-called-inside-a-thread-in-python
+ os._exit(1)
watchdog_thread = threading.Thread(target=java_watchdog_thread)
watchdog_thread.daemon = True
watchdog_thread.start()
while True :
- req = intp.getStatements()
- if req == None:
- break
-
try:
+ req = intp.getStatements()
+ if req == None:
+ break
+
stmts = req.statements().split("\n")
final_code = None
# ori_code = req.statements()
diff --git a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
index 843ed4e19..79f8f7dc6 100644
--- a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
+++ b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
@@ -280,7 +280,7 @@ class PythonSession extends Logging {
logger.info("python executor Finished to close")
}
- def kind: Kind = Python()
+ def getKind: Kind = Python()
def changeDT(dt: String): DataType = dt match {
case "int" | "int16" | "int32" | "int64" => IntType
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py b/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py
index b81697def..adbf5ecad 100644
--- a/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py
+++ b/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py
@@ -236,9 +236,9 @@ def java_watchdog_thread(sleep=10):
try:
intp.getKind()
except Exception as e:
- print("Failed to detect java daemon, now exit python process")
- print(e)
- sys.exit(1)
+ # Failed to detect java daemon, now exit python process
+ #just exit thread see https://stackoverflow.com/questions/905189/why-does-sys-exit-not-exit-when-called-inside-a-thread-in-python
+ os._exit(1)
watchdog_thread = threading.Thread(target=java_watchdog_thread)
watchdog_thread.daemon = True
watchdog_thread.start()
@@ -260,8 +260,8 @@ setup_plt_show()
while True :
- req = intp.getStatements()
try:
+ req = intp.getStatements()
stmts = req.statements().split("\n")
jobGroup = req.jobGroup()
final_code = None
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index c4370c7eb..966caa03a 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -52,6 +52,14 @@ object SparkConfiguration extends Logging {
val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")
+ val SPARK_PYTHON_TEST_MODE_ENABLE =
+ CommonVars[Boolean]("linkis.spark.python.test.mode.enable", false)
+
+ val SPARK_PYTHON_TEST_MODE_MIX__PYSHELL_PATH = CommonVars[String](
+ "linkis.spark.python.mix.pyshell.path",
+ "/appcom/Install/linkis/mix_pyspark.py"
+ )
+
val SPARK_EXTRA_JARS = CommonVars[String](
"spark.jars",
"",
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index c1b1b96cf..11964120b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -120,9 +120,16 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}
IOUtils.closeQuietly(lineOutputStream)
Utils.tryAndErrorMsg {
- pid.foreach(p => Utils.exec(Array("kill", "-9", p), 3000L))
process.destroy()
process = null
+ Thread.sleep(1000 * 2L)
+ // process.destroy will kills the subprocess,not need to force kill with -9,
+ // kill -9 may cause resources not to be released
+ pid.foreach(p => {
+ logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]")
+ Utils.exec(Array("kill", "-15", p), 3000L)
+ })
+
}("process close failed")
}
logger.info(s"To delete python executor")
@@ -187,11 +194,24 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
.foreach(pythonClasspath ++= File.pathSeparator ++= _)
val cmd = CommandLine.parse(pythonExec)
- cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false)
+ if (SparkConfiguration.SPARK_PYTHON_TEST_MODE_ENABLE.getValue) {
+ val path = SparkConfiguration.SPARK_PYTHON_TEST_MODE_MIX__PYSHELL_PATH.getValue
+ logger.info(
+ s"${SparkConfiguration.SPARK_PYTHON_TEST_MODE_ENABLE.key} => true, will use ${SparkConfiguration.SPARK_PYTHON_TEST_MODE_MIX__PYSHELL_PATH.key}:${path}"
+ )
+ cmd.addArgument(path, false)
+ } else {
+ cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false)
+ }
+
cmd.addArgument(port.toString, false)
cmd.addArgument(EngineUtils.sparkSubmitVersion().replaceAll("\\.", ""), false)
cmd.addArgument(py4jToken, false)
cmd.addArgument(pythonClasspath.toString(), false)
+
+ // add java process pid
+ // cmd.addArgument(Utils.getProcessId(), false)
+
cmd.addArgument(pyFiles, false)
val builder = new ProcessBuilder(cmd.toStrings.toSeq.toList.asJava)
@@ -220,13 +240,16 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
// add hook to shutdown python
Utils.addShutdownHook {
close
- Utils.tryAndError(pid.foreach(p => Utils.exec(Array("kill", "-9", p), 3000L)))
+// Utils.tryAndError(pid.foreach(p => {
+// logger.info(s"try to kill Pyspark process with: [kill -9 ${p}]")
+// Utils.exec(Array("kill", "-9", p), 3000L)
+// }))
}
Future {
val exitCode = process.waitFor()
pythonScriptInitialized = false
- logger.info("Pyspark process has stopped with exit code " + exitCode)
+ logger.info("Pyspark process has stopped with exit code " + exitCode)
// close
Utils.tryFinally({
if (promise != null && !promise.isCompleted) {
@@ -341,6 +364,9 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
if (!pythonScriptInitialized) {
logger.info(message)
} else {
+ if (SparkConfiguration.SPARK_PYTHON_TEST_MODE_ENABLE.getValue) {
+ logger.info(message)
+ }
lineOutputStream.write(message.getBytes("utf-8"))
}
}
@@ -407,6 +433,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
override protected def getExecutorIdPreFix: String = "SparkPythonExecutor_"
def printLog(log: Any): Unit = {
+ logger.info(log.toString)
if (engineExecutionContext != null) {
engineExecutionContext.appendStdout("+++++++++++++++")
engineExecutionContext.appendStdout(log.toString)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org