You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/08/22 06:54:34 UTC
[incubator-linkis] branch dev-1.3.1 updated: fix shell bug (#2923)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 393fb5e39 fix shell bug (#2923)
393fb5e39 is described below
commit 393fb5e39847f41c81086661e7ae96abf0341eb8
Author: huangKai-2323 <62...@users.noreply.github.com>
AuthorDate: Mon Aug 22 14:54:28 2022 +0800
fix shell bug (#2923)
* fix shell bug
* fomat file
---
.../engineplugin/shell/executor/ReaderThread.scala | 20 ++++++----
.../shell/executor/ShellEngineConnExecutor.scala | 46 +++++++---------------
2 files changed, 27 insertions(+), 39 deletions(-)
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
index cc08941ed..d814dd259 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils
import java.io.BufferedReader
import java.util
+import java.util.concurrent.CountDownLatch
class ReaderThread extends Thread with Logging {
private var engineExecutionContext: EngineExecutionContext = _
@@ -32,18 +33,21 @@ class ReaderThread extends Thread with Logging {
private var extractor: YarnAppIdExtractor = _
private var isStdout: Boolean = false
private val logListCount = CommonVars[Int]("wds.linkis.engineconn.log.list.count", 50)
+ private var counter: CountDownLatch = _
def this(
engineExecutionContext: EngineExecutionContext,
inputReader: BufferedReader,
extractor: YarnAppIdExtractor,
- isStdout: Boolean
+ isStdout: Boolean,
+ counter: CountDownLatch
) {
this()
this.inputReader = inputReader
this.engineExecutionContext = engineExecutionContext
this.extractor = extractor
this.isStdout = isStdout
+ this.counter = counter
}
def onDestroy(): Unit = {
@@ -71,27 +75,27 @@ class ReaderThread extends Thread with Logging {
Utils.tryCatch {
var line: String = null
val logArray: util.List[String] = new util.ArrayList[String]
- while ({ line = inputReader.readLine(); line != null }) {
+ while ({ line = inputReader.readLine(); line != null && !isInterrupted }) {
logger.info("read logger line :{}", line)
logArray.add(line)
+ extractor.appendLineToExtractor(line)
+ if (isStdout) engineExecutionContext.appendTextResultSet(line)
+
if (logArray.size > logListCount.getValue) {
val linelist = StringUtils.join(logArray, "\n")
- extractor.appendLineToExtractor(linelist)
- if (isStdout) engineExecutionContext.appendStdout(linelist)
- engineExecutionContext.appendTextResultSet(linelist)
+ engineExecutionContext.appendStdout(linelist)
logArray.clear()
}
}
if (logArray.size > 0) {
val linelist = StringUtils.join(logArray, "\n")
- extractor.appendLineToExtractor(linelist)
- if (isStdout) engineExecutionContext.appendStdout(linelist)
- engineExecutionContext.appendTextResultSet(linelist)
+ engineExecutionContext.appendStdout(linelist)
logArray.clear()
}
} { t =>
logger.warn("inputReader reading the input stream", t)
}
+ counter.countDown()
}
}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
index 5434f0174..7c93d48a1 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
@@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils
import java.io.{BufferedReader, File, FileReader, InputStreamReader, IOException}
import java.util
+import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
@@ -177,17 +178,17 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
-
+ val counter: CountDownLatch = new CountDownLatch(2)
inputReaderThread =
- new ReaderThread(engineExecutionContext, bufferedReader, extractor, true)
- errReaderThread = new ReaderThread(engineExecutionContext, bufferedReader, extractor, false)
+ new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
+ errReaderThread =
+ new ReaderThread(engineExecutionContext, bufferedReader, extractor, false, counter)
inputReaderThread.start()
errReaderThread.start()
val exitCode = process.waitFor()
- joinThread(inputReaderThread)
- joinThread(errReaderThread)
+ counter.await()
completed.set(true)
@@ -199,18 +200,17 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
logger.error("Execute shell code failed, reason:", e)
ErrorExecuteResponse("run shell failed", e)
}
- case t: Throwable =>
- ErrorExecuteResponse("Internal error executing shell process(执行shell进程内部错误)", t)
+
} finally {
if (!completed.get()) {
- errReaderThread.interrupt()
- joinThread(errReaderThread)
+ Utils.tryAndWarn(errReaderThread.interrupt())
+ Utils.tryAndWarn(inputReaderThread.interrupt())
+ }
+ Utils.tryAndWarn {
+ extractor.onDestroy()
+ inputReaderThread.onDestroy()
+ errReaderThread.onDestroy()
}
-
- extractor.onDestroy()
- inputReaderThread.onDestroy()
- errReaderThread.onDestroy()
-
IOUtils.closeQuietly(bufferedReader)
IOUtils.closeQuietly(errorsReader)
}
@@ -226,23 +226,7 @@ class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging
}
private def generateRunCodeWithArgs(code: String, args: Array[String]): Array[String] = {
- Array(
- "sh",
- "-c",
- "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'"
- ) // pass args by pipeline
- }
-
- private def joinThread(thread: Thread) = {
- while ({
- thread.isAlive
- }) {
- Utils.tryCatch {
- thread.join()
- } { t =>
- logger.warn("Exception thrown while joining on: " + thread, t)
- }
- }
+ Array("sh", "-c", "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'")
}
override def getId(): String = Sender.getThisServiceInstance.getInstance + "_" + id
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org