You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/08/22 06:54:34 UTC

[incubator-linkis] branch dev-1.3.1 updated: fix shell bug (#2923)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
     new 393fb5e39 fix shell bug (#2923)
393fb5e39 is described below

commit 393fb5e39847f41c81086661e7ae96abf0341eb8
Author: huangKai-2323 <62...@users.noreply.github.com>
AuthorDate: Mon Aug 22 14:54:28 2022 +0800

    fix shell bug (#2923)
    
    * fix shell bug
    
    * fomat file
---
 .../engineplugin/shell/executor/ReaderThread.scala | 20 ++++++----
 .../shell/executor/ShellEngineConnExecutor.scala   | 46 +++++++---------------
 2 files changed, 27 insertions(+), 39 deletions(-)

diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
index cc08941ed..d814dd259 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils
 
 import java.io.BufferedReader
 import java.util
+import java.util.concurrent.CountDownLatch
 
 class ReaderThread extends Thread with Logging {
   private var engineExecutionContext: EngineExecutionContext = _
@@ -32,18 +33,21 @@ class ReaderThread extends Thread with Logging {
   private var extractor: YarnAppIdExtractor = _
   private var isStdout: Boolean = false
   private val logListCount = CommonVars[Int]("wds.linkis.engineconn.log.list.count", 50)
+  private var counter: CountDownLatch = _
 
   def this(
       engineExecutionContext: EngineExecutionContext,
       inputReader: BufferedReader,
       extractor: YarnAppIdExtractor,
-      isStdout: Boolean
+      isStdout: Boolean,
+      counter: CountDownLatch
   ) {
     this()
     this.inputReader = inputReader
     this.engineExecutionContext = engineExecutionContext
     this.extractor = extractor
     this.isStdout = isStdout
+    this.counter = counter
   }
 
   def onDestroy(): Unit = {
@@ -71,27 +75,27 @@ class ReaderThread extends Thread with Logging {
     Utils.tryCatch {
       var line: String = null
       val logArray: util.List[String] = new util.ArrayList[String]
-      while ({ line = inputReader.readLine(); line != null }) {
+      while ({ line = inputReader.readLine(); line != null && !isInterrupted }) {
         logger.info("read logger line :{}", line)
         logArray.add(line)
+        extractor.appendLineToExtractor(line)
+        if (isStdout) engineExecutionContext.appendTextResultSet(line)
+
         if (logArray.size > logListCount.getValue) {
           val linelist = StringUtils.join(logArray, "\n")
-          extractor.appendLineToExtractor(linelist)
-          if (isStdout) engineExecutionContext.appendStdout(linelist)
-          engineExecutionContext.appendTextResultSet(linelist)
+          engineExecutionContext.appendStdout(linelist)
           logArray.clear()
         }
       }
       if (logArray.size > 0) {
         val linelist = StringUtils.join(logArray, "\n")
-        extractor.appendLineToExtractor(linelist)
-        if (isStdout) engineExecutionContext.appendStdout(linelist)
-        engineExecutionContext.appendTextResultSet(linelist)
+        engineExecutionContext.appendStdout(linelist)
         logArray.clear()
       }
     } { t =>
       logger.warn("inputReader reading the input stream", t)
     }
+    counter.countDown()
   }
 
 }
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 5434f0174..7c93d48a1 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils
 
 import java.io.{BufferedReader, File, FileReader, InputStreamReader, IOException}
 import java.util
+import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
@@ -177,17 +178,17 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
 
       bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
       errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
-
+      val counter: CountDownLatch = new CountDownLatch(2)
       inputReaderThread =
-        new ReaderThread(engineExecutionContext, bufferedReader, extractor, true)
-      errReaderThread = new ReaderThread(engineExecutionContext, bufferedReader, extractor, false)
+        new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
+      errReaderThread =
+        new ReaderThread(engineExecutionContext, bufferedReader, extractor, false, counter)
 
       inputReaderThread.start()
       errReaderThread.start()
 
       val exitCode = process.waitFor()
-      joinThread(inputReaderThread)
-      joinThread(errReaderThread)
+      counter.await()
 
       completed.set(true)
 
@@ -199,18 +200,17 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
         logger.error("Execute shell code failed, reason:", e)
         ErrorExecuteResponse("run shell failed", e)
       }
-      case t: Throwable =>
-        ErrorExecuteResponse("Internal error executing shell process(执行shell进程内部错误)", t)
+
     } finally {
       if (!completed.get()) {
-        errReaderThread.interrupt()
-        joinThread(errReaderThread)
+        Utils.tryAndWarn(errReaderThread.interrupt())
+        Utils.tryAndWarn(inputReaderThread.interrupt())
+      }
+      Utils.tryAndWarn {
+        extractor.onDestroy()
+        inputReaderThread.onDestroy()
+        errReaderThread.onDestroy()
       }
-
-      extractor.onDestroy()
-      inputReaderThread.onDestroy()
-      errReaderThread.onDestroy()
-
       IOUtils.closeQuietly(bufferedReader)
       IOUtils.closeQuietly(errorsReader)
     }
@@ -226,23 +226,7 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
   }
 
   private def generateRunCodeWithArgs(code: String, args: Array[String]): Array[String] = {
-    Array(
-      "sh",
-      "-c",
-      "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'"
-    ) // pass args by pipeline
-  }
-
-  private def joinThread(thread: Thread) = {
-    while ({
-      thread.isAlive
-    }) {
-      Utils.tryCatch {
-        thread.join()
-      } { t =>
-        logger.warn("Exception thrown while joining on: " + thread, t)
-      }
-    }
+    Array("sh", "-c", "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'")
   }
 
   override def getId(): String = Sender.getThisServiceInstance.getInstance + "_" + id


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org