You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/03/07 12:00:47 UTC

[linkis] branch dev-1.3.2 updated: fix python watch dog not work (#4295)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 6bb950224 fix python watch dog not work (#4295)
6bb950224 is described below

commit 6bb9502247f0e02ddba3c089504aa353a10f6168
Author: Casion <ca...@gmail.com>
AuthorDate: Tue Mar 7 20:00:39 2023 +0800

    fix python watch dog not work (#4295)
---
 .../org/apache/linkis/common/utils/Utils.scala     | 21 +++++++++++++
 .../python/src/main/resources/python/python.py     | 16 +++++-----
 .../python/executor/PythonSession.scala            |  2 +-
 .../spark/src/main/resources/python/mix_pyspark.py |  8 ++---
 .../spark/config/SparkConfiguration.scala          |  8 +++++
 .../spark/executor/SparkPythonExecutor.scala       | 35 +++++++++++++++++++---
 6 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
index 3d3bc636a..80e3ff7e5 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
+++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/Utils.scala
@@ -28,6 +28,7 @@ import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.SystemUtils
 
 import java.io.{BufferedReader, InputStreamReader}
+import java.lang.management.ManagementFactory
 import java.net.InetAddress
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
@@ -362,4 +363,24 @@ object Utils extends Logging {
 
   def getJvmUser: String = System.getProperty("user.name")
 
+  // Note: may fail in some JVM implementations
+  def getProcessId(): String = {
+    // therefore fallback has to be provided
+    // something like '<pid>@<hostname>', at least in SUN / Oracle JVMs
+    val jvmName = ManagementFactory.getRuntimeMXBean.getName
+    val index = jvmName.indexOf('@')
+    // part before '@' empty (index = 0) / '@' not found (index = -1)
+    if (index < 1) {
+      null
+    }
+    Utils.tryCatch {
+      val getpid = jvmName.substring(0, index)
+      logger.info(s"get java process Id:$getpid")
+      getpid
+    } { t =>
+      logger.info(s"Failed to get process Id with error", t.getMessage)
+      null
+    }
+  }
+
 }
diff --git a/linkis-engineconn-plugins/python/src/main/resources/python/python.py b/linkis-engineconn-plugins/python/src/main/resources/python/python.py
index 1b040a2bb..15e4df2ab 100644
--- a/linkis-engineconn-plugins/python/src/main/resources/python/python.py
+++ b/linkis-engineconn-plugins/python/src/main/resources/python/python.py
@@ -213,21 +213,21 @@ def java_watchdog_thread(sleep=10):
     while True :
         time.sleep(sleep)
         try:
-            intp.kind()
+            intp.getKind()
         except Exception as e:
-            print("Failed to detect java daemon, now exit python process")
-            print(e)
-            sys.exit(1)
+            # Failed to detect java daemon, now exit python process
+            #just exit thread see https://stackoverflow.com/questions/905189/why-does-sys-exit-not-exit-when-called-inside-a-thread-in-python
+            os._exit(1)
 watchdog_thread = threading.Thread(target=java_watchdog_thread)
 watchdog_thread.daemon = True
 watchdog_thread.start()
 
 while True :
-  req = intp.getStatements()
-  if req == None:
-    break
-
   try:
+    req = intp.getStatements()
+    if req == None:
+      break
+
     stmts = req.statements().split("\n")
     final_code = None
 #     ori_code = req.statements()
diff --git a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
index 843ed4e19..79f8f7dc6 100644
--- a/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
+++ b/linkis-engineconn-plugins/python/src/main/scala/org/apache/linkis/manager/engineplugin/python/executor/PythonSession.scala
@@ -280,7 +280,7 @@ class PythonSession extends Logging {
     logger.info("python executor Finished to close")
   }
 
-  def kind: Kind = Python()
+  def getKind: Kind = Python()
 
   def changeDT(dt: String): DataType = dt match {
     case "int" | "int16" | "int32" | "int64" => IntType
diff --git a/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py b/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py
index b81697def..adbf5ecad 100644
--- a/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py
+++ b/linkis-engineconn-plugins/spark/src/main/resources/python/mix_pyspark.py
@@ -236,9 +236,9 @@ def java_watchdog_thread(sleep=10):
         try:
             intp.getKind()
         except Exception as e:
-            print("Failed to detect java daemon, now exit python process")
-            print(e)
-            sys.exit(1)
+            # Failed to detect java daemon, now exit python process
+            #just exit thread see https://stackoverflow.com/questions/905189/why-does-sys-exit-not-exit-when-called-inside-a-thread-in-python
+            os._exit(1)
 watchdog_thread = threading.Thread(target=java_watchdog_thread)
 watchdog_thread.daemon = True
 watchdog_thread.start()
@@ -260,8 +260,8 @@ setup_plt_show()
 
 
 while True :
-    req = intp.getStatements()
     try:
+        req = intp.getStatements()
         stmts = req.statements().split("\n")
         jobGroup = req.jobGroup()
         final_code = None
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index c4370c7eb..966caa03a 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -52,6 +52,14 @@ object SparkConfiguration extends Logging {
 
   val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")
 
+  val SPARK_PYTHON_TEST_MODE_ENABLE =
+    CommonVars[Boolean]("linkis.spark.python.test.mode.enable", false)
+
+  val SPARK_PYTHON_TEST_MODE_MIX__PYSHELL_PATH = CommonVars[String](
+    "linkis.spark.python.mix.pyshell.path",
+    "/appcom/Install/linkis/mix_pyspark.py"
+  )
+
   val SPARK_EXTRA_JARS = CommonVars[String](
     "spark.jars",
     "",
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
index c1b1b96cf..11964120b 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala
@@ -120,9 +120,16 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
       }
       IOUtils.closeQuietly(lineOutputStream)
       Utils.tryAndErrorMsg {
-        pid.foreach(p => Utils.exec(Array("kill", "-9", p), 3000L))
         process.destroy()
         process = null
+        Thread.sleep(1000 * 2L)
+        // process.destroy will kills the subprocess,not need to force kill with -9,
+        // kill -9 may cause resources not to be released
+        pid.foreach(p => {
+          logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]")
+          Utils.exec(Array("kill", "-15", p), 3000L)
+        })
+
       }("process close failed")
     }
     logger.info(s"To delete python executor")
@@ -187,11 +194,24 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
       .foreach(pythonClasspath ++= File.pathSeparator ++= _)
 
     val cmd = CommandLine.parse(pythonExec)
-    cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false)
+    if (SparkConfiguration.SPARK_PYTHON_TEST_MODE_ENABLE.getValue) {
+      val path = SparkConfiguration.SPARK_PYTHON_TEST_MODE_MIX__PYSHELL_PATH.getValue
+      logger.info(
+        s"${SparkConfiguration.SPARK_PYTHON_TEST_MODE_ENABLE.key} => true, will use ${SparkConfiguration.SPARK_PYTHON_TEST_MODE_MIX__PYSHELL_PATH.key}:${path}"
+      )
+      cmd.addArgument(path, false)
+    } else {
+      cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false)
+    }
+
     cmd.addArgument(port.toString, false)
     cmd.addArgument(EngineUtils.sparkSubmitVersion().replaceAll("\\.", ""), false)
     cmd.addArgument(py4jToken, false)
     cmd.addArgument(pythonClasspath.toString(), false)
+
+    // add java process pid
+    // cmd.addArgument(Utils.getProcessId(), false)
+
     cmd.addArgument(pyFiles, false)
 
     val builder = new ProcessBuilder(cmd.toStrings.toSeq.toList.asJava)
@@ -220,13 +240,16 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     // add hook to shutdown python
     Utils.addShutdownHook {
       close
-      Utils.tryAndError(pid.foreach(p => Utils.exec(Array("kill", "-9", p), 3000L)))
+//      Utils.tryAndError(pid.foreach(p => {
+//        logger.info(s"try to kill Pyspark process with: [kill -9 ${p}]")
+//        Utils.exec(Array("kill", "-9", p), 3000L)
+//      }))
     }
 
     Future {
       val exitCode = process.waitFor()
       pythonScriptInitialized = false
-      logger.info("Pyspark process  has stopped with exit code " + exitCode)
+      logger.info("Pyspark process has stopped with exit code " + exitCode)
       //      close
       Utils.tryFinally({
         if (promise != null && !promise.isCompleted) {
@@ -341,6 +364,9 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
     if (!pythonScriptInitialized) {
       logger.info(message)
     } else {
+      if (SparkConfiguration.SPARK_PYTHON_TEST_MODE_ENABLE.getValue) {
+        logger.info(message)
+      }
       lineOutputStream.write(message.getBytes("utf-8"))
     }
   }
@@ -407,6 +433,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
   override protected def getExecutorIdPreFix: String = "SparkPythonExecutor_"
 
   def printLog(log: Any): Unit = {
+    logger.info(log.toString)
     if (engineExecutionContext != null) {
       engineExecutionContext.appendStdout("+++++++++++++++")
       engineExecutionContext.appendStdout(log.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org