You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/04/27 13:37:02 UTC
[linkis] branch dev-1.4.0 updated: Translate engineconn-plugins-shell service classes from Scala to Java (#4473)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new db37eed39 Translate engineconn-plugins-shell service classes from Scala to Java (#4473)
db37eed39 is described below
commit db37eed39c3770c580e9c1a5d649c0c94064e5d2
Author: ChengJie1053 <18...@163.com>
AuthorDate: Thu Apr 27 21:36:53 2023 +0800
Translate engineconn-plugins-shell service classes from Scala to Java (#4473)
* Remove useless code
---
.../engineplugin/shell/ShellEngineConnPlugin.java | 81 +++++
.../ShellProcessEngineConnLaunchBuilder.java} | 6 +-
.../shell/common/ShellEngineConnPluginConst.java} | 9 +-
.../shell/conf/ShellEngineConnConf.java} | 24 +-
.../shell/exception/NoCorrectUserException.java} | 12 +-
.../shell/exception/ShellCodeErrorException.java} | 16 +-
.../engineplugin/shell/executor/ReaderThread.java | 97 ++++++
.../shell/executor/ShellECTaskInfo.java | 54 ++++
.../ShellEngineConnConcurrentExecutor.java | 358 +++++++++++++++++++++
.../shell/executor/ShellEngineConnExecutor.java | 330 +++++++++++++++++++
.../shell/executor/YarnAppIdExtractor.java | 53 +++
.../engineplugin/shell/ShellEngineConnPlugin.scala | 76 -----
.../shell/exception/NoCorrectUserException.scala | 27 --
.../engineplugin/shell/executor/ReaderThread.scala | 96 ------
.../ShellEngineConnConcurrentExecutor.scala | 348 --------------------
.../shell/executor/ShellEngineConnExecutor.scala | 319 ------------------
.../shell/executor/YarnAppIdExtractor.scala | 81 -----
.../shell/factory/ShellEngineConnFactory.scala | 0
.../shell/TestShellEngineConnPlugin.java} | 20 +-
.../common/TestShellEngineConnPluginConst.java} | 17 +-
.../exception/TestNoCorrectUserException.java} | 20 +-
.../executor/TestShellEngineConnExecutor.java | 61 ++++
.../executor/TestShellEngineConnExecutor.scala | 62 ----
23 files changed, 1092 insertions(+), 1075 deletions(-)
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.java
new file mode 100644
index 000000000..704f1cd6e
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell;
+
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.shell.builder.ShellProcessEngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.shell.factory.ShellEngineConnFactory;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineType;
+import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ShellEngineConnPlugin implements EngineConnPlugin {
+ private Object resourceLocker = new Object();
+ private Object engineFactoryLocker = new Object();
+ private volatile EngineResourceFactory engineResourceFactory;
+ private volatile EngineConnFactory engineFactory;
+ private List<Label<?>> defaultLabels = new ArrayList<>();
+
+ public void init(Map<String, Object> params) {
+ Label<?> engineTypeLabel =
+ EngineTypeLabelCreator.createEngineTypeLabel(EngineType.SHELL().toString());
+ this.defaultLabels.add(engineTypeLabel);
+ }
+
+ @Override
+ public EngineResourceFactory getEngineResourceFactory() {
+ if (engineResourceFactory == null) {
+ synchronized (resourceLocker) {
+ if (engineResourceFactory == null) {
+ engineResourceFactory = new GenericEngineResourceFactory();
+ }
+ }
+ }
+ return engineResourceFactory;
+ }
+
+ @Override
+ public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+ return new ShellProcessEngineConnLaunchBuilder();
+ }
+
+ @Override
+ public EngineConnFactory getEngineConnFactory() {
+ if (engineFactory == null) {
+ synchronized (engineFactoryLocker) {
+ if (engineFactory == null) {
+ engineFactory = new ShellEngineConnFactory();
+ }
+ }
+ }
+ return engineFactory;
+ }
+
+ @Override
+ public List<Label<?>> getDefaultLabels() {
+ return this.defaultLabels;
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.java
similarity index 81%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.java
index 23668f32a..c030db4e2 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.java
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.builder
+package org.apache.linkis.manager.engineplugin.shell.builder;
-import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
-class ShellProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {}
+public class ShellProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/common/ShellEnginePluginConst.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/common/ShellEngineConnPluginConst.java
similarity index 73%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/common/ShellEnginePluginConst.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/common/ShellEngineConnPluginConst.java
index 34771e68b..30fd9cf68 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/common/ShellEnginePluginConst.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/common/ShellEngineConnPluginConst.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.common
+package org.apache.linkis.manager.engineplugin.shell.common;
-object ShellEngineConnPluginConst {
- final val RUNTIME_ARGS_KEY: String = "extraArguments"
- final val SHELL_RUNTIME_WORKING_DIRECTORY: String = "wds.linkis.shell.runtime.working.directory"
+public class ShellEngineConnPluginConst {
+ public static final String RUNTIME_ARGS_KEY = "extraArguments";
+ public static final String SHELL_RUNTIME_WORKING_DIRECTORY =
+ "wds.linkis.shell.runtime.working.directory";
}
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
similarity index 65%
copy from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
copy to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
index 50ba61bb0..3849bc1ee 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
@@ -15,24 +15,14 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.exception
+package org.apache.linkis.manager.engineplugin.shell.conf;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.apache.linkis.common.conf.CommonVars;
-class TestNoCorrectUserException {
-
- @Test
- def testNoCorrectUserException: Unit = {
-
- val exception = NoCorrectUserException
- Assertions.assertNotNull(exception)
- }
-
- @Test
- def testShellCodeErrorException: Unit = {
-
- val exception = ShellCodeErrorException
- Assertions.assertNotNull(exception)
- }
+public class ShellEngineConnConf {
+ public static final int SHELL_ENGINECONN_CONCURRENT_LIMIT =
+ CommonVars.apply("linkis.engineconn.shell.concurrent.limit", 30).getValue();
+ public static final int LOG_SERVICE_MAX_THREAD_SIZE =
+ CommonVars.apply("linkis.engineconn.shell.log.max.thread.size", 50).getValue();
}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.java
similarity index 65%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.java
index d8d7edaa3..6c5d106f6 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.java
@@ -15,6 +15,14 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.executor
+package org.apache.linkis.manager.engineplugin.shell.exception;
-case class ShellECTaskInfo(taskId: String, process: Process, yarnAppIdExtractor: YarnAppIdExtractor)
+import org.apache.linkis.common.exception.ErrorException;
+
+import static org.apache.linkis.manager.engineplugin.shell.errorcode.LinkisCommonsErrorCodeSummary.*;
+
+public class NoCorrectUserException extends ErrorException {
+ public NoCorrectUserException() {
+ super(NO_ILLEGAL_USER_HOLDS.getErrorCode(), NO_ILLEGAL_USER_HOLDS.getErrorDesc());
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/ShellCodeErrorException.java
similarity index 66%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/ShellCodeErrorException.java
index ba74dbbad..49b41ad03 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/ShellCodeErrorException.java
@@ -15,16 +15,14 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.conf
+package org.apache.linkis.manager.engineplugin.shell.exception;
-import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.exception.ErrorException;
-object ShellEngineConnConf {
-
- val SHELL_ENGINECONN_CONCURRENT_LIMIT: Int =
- CommonVars[Int]("linkis.engineconn.shell.concurrent.limit", 30).getValue
-
- val LOG_SERVICE_MAX_THREAD_SIZE: Int =
- CommonVars("linkis.engineconn.shell.log.max.thread.size", 50).getValue
+import static org.apache.linkis.manager.engineplugin.shell.errorcode.LinkisCommonsErrorCodeSummary.*;
+public class ShellCodeErrorException extends ErrorException {
+ public ShellCodeErrorException() {
+ super(SHELL_CODE_IS_WRONG.getErrorCode(), SHELL_CODE_IS_WRONG.getErrorDesc());
+ }
}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.java
new file mode 100644
index 000000000..8b3e7cad5
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReaderThread extends Thread {
+ private static final Logger logger = LoggerFactory.getLogger(ReaderThread.class);
+
+ private EngineExecutionContext engineExecutionContext;
+ private BufferedReader inputReader;
+ private YarnAppIdExtractor extractor;
+ private boolean isStdout;
+ private final int logListCount =
+ CommonVars.apply("wds.linkis.engineconn.log.list.count", 50).getValue();
+ private CountDownLatch counter;
+
+ private boolean isReaderAlive = true;
+
+ public ReaderThread(
+ EngineExecutionContext engineExecutionContext,
+ BufferedReader inputReader,
+ YarnAppIdExtractor extractor,
+ boolean isStdout,
+ CountDownLatch counter) {
+ this.engineExecutionContext = engineExecutionContext;
+ this.inputReader = inputReader;
+ this.extractor = extractor;
+ this.isStdout = isStdout;
+ this.counter = counter;
+ }
+
+ public void onDestroy() {
+ isReaderAlive = false;
+ }
+
+ @Override
+ public void run() {
+ String line = null;
+ List<String> logArray = new ArrayList<>();
+ while (true) {
+ try {
+ line = inputReader.readLine();
+ if (!(line != null && isReaderAlive)) break;
+ } catch (IOException e) {
+ logger.warn("inputReader reading the input stream");
+ break;
+ }
+ logger.info("read logger line :{}", line);
+ logArray.add(line);
+ extractor.appendLineToExtractor(line);
+ if (isStdout) {
+ engineExecutionContext.appendTextResultSet(line);
+ }
+ if (logArray.size() > logListCount) {
+ String linelist = StringUtils.join(logArray, "\n");
+ engineExecutionContext.appendStdout(linelist);
+ logArray.clear();
+ }
+ }
+ if (logArray.size() > 0) {
+ String linelist = StringUtils.join(logArray, "\n");
+ engineExecutionContext.appendStdout(linelist);
+ logArray.clear();
+ }
+ IOUtils.closeQuietly(inputReader);
+ counter.countDown();
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.java
new file mode 100644
index 000000000..dc843327d
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+public class ShellECTaskInfo {
+ private String taskId;
+ private Process process;
+ private YarnAppIdExtractor yarnAppIdExtractor;
+
+ public ShellECTaskInfo(String taskId, Process process, YarnAppIdExtractor yarnAppIdExtractor) {
+ this.taskId = taskId;
+ this.process = process;
+ this.yarnAppIdExtractor = yarnAppIdExtractor;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public Process getProcess() {
+ return process;
+ }
+
+ public void setProcess(Process process) {
+ this.process = process;
+ }
+
+ public YarnAppIdExtractor getYarnAppIdExtractor() {
+ return yarnAppIdExtractor;
+ }
+
+ public void setYarnAppIdExtractor(YarnAppIdExtractor yarnAppIdExtractor) {
+ this.yarnAppIdExtractor = yarnAppIdExtractor;
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
new file mode 100644
index 000000000..3f445ddd5
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.governance.common.utils.GovernanceUtils;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
+import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.concurrent.ExecutionContextExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShellEngineConnConcurrentExecutor extends ConcurrentComputationExecutor {
+ private static final Logger logger =
+ LoggerFactory.getLogger(ShellEngineConnConcurrentExecutor.class);
+
+ private EngineExecutionContext engineExecutionContext;
+
+ private List<Label<?>> executorLabels = new ArrayList<>();
+
+ private Map<String, ShellECTaskInfo> shellECTaskInfoCache = new ConcurrentHashMap<>();
+
+ private int id;
+ private int maxRunningNumber;
+
+ public ShellEngineConnConcurrentExecutor(int id, int maxRunningNumber) {
+ super(maxRunningNumber);
+ this.id = id;
+ this.maxRunningNumber = maxRunningNumber;
+ }
+
+ private final ExecutionContextExecutorService logAsyncService =
+ Utils.newCachedExecutionContext(
+ ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE, "ShelLogService-Thread-", true);
+
+ @Override
+ public void init() {
+ logger.info("Ready to change engine state!");
+ super.init();
+ }
+
+ @Override
+ public ExecuteResponse executeCompletely(
+ EngineExecutionContext engineExecutionContext, String code, String completedLine) {
+ String newcode = completedLine + code;
+ logger.debug("newcode is " + newcode);
+ return executeLine(engineExecutionContext, newcode);
+ }
+
+ @Override
+ public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String code) {
+ if (engineExecutionContext != null) {
+ this.engineExecutionContext = engineExecutionContext;
+ logger.info("Shell executor reset new engineExecutionContext!");
+ }
+
+ if (engineExecutionContext.getJobId().isEmpty()) {
+ return new ErrorExecuteResponse("taskID is null", null);
+ }
+
+ String taskId = engineExecutionContext.getJobId().get();
+ BufferedReader bufferedReader = null;
+ BufferedReader errorsReader = null;
+
+ AtomicBoolean completed = new AtomicBoolean(false);
+ ReaderThread errReaderThread = null;
+ ReaderThread inputReaderThread = null;
+
+ try {
+ engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
+
+ String[] argsArr;
+ if (engineExecutionContext.getTotalParagraph() == 1
+ && engineExecutionContext.getProperties() != null
+ && engineExecutionContext
+ .getProperties()
+ .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
+ ArrayList<String> argsList =
+ (ArrayList<String>)
+ engineExecutionContext
+ .getProperties()
+ .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
+ argsArr = argsList.toArray(new String[argsList.size()]);
+ logger.info(
+ "Will execute shell task with user-specified arguments: '{}'",
+ StringUtils.join(argsArr, "' '"));
+ } else {
+ argsArr = null;
+ }
+
+ String workingDirectory;
+ if (engineExecutionContext.getTotalParagraph() == 1
+ && engineExecutionContext.getProperties() != null
+ && engineExecutionContext
+ .getProperties()
+ .containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
+ String wdStr =
+ (String)
+ engineExecutionContext
+ .getProperties()
+ .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
+ if (isExecutePathExist(wdStr)) {
+ logger.info(
+ "Will execute shell task under user-specified working-directory: '{}'", wdStr);
+ workingDirectory = wdStr;
+ } else {
+ logger.warn(
+ "User-specified working-directory: '{}' does not exist or user does not have access permission. Will execute shell task under default working-directory. Please contact the administrator!",
+ wdStr);
+ workingDirectory = null;
+ }
+ } else {
+ workingDirectory = null;
+ }
+
+ String[] generatedCode =
+ argsArr == null || argsArr.length == 0
+ ? generateRunCode(code)
+ : generateRunCodeWithArgs(code, argsArr);
+
+ ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
+
+ if (StringUtils.isNotBlank(workingDirectory)) {
+ processBuilder.directory(new File(workingDirectory));
+ }
+
+ processBuilder.redirectErrorStream(false);
+ YarnAppIdExtractor extractor = new YarnAppIdExtractor();
+ Process process = processBuilder.start();
+ bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+
+ // add task id and task Info cache
+ shellECTaskInfoCache.put(taskId, new ShellECTaskInfo(taskId, process, extractor));
+
+ CountDownLatch counter = new CountDownLatch(2);
+ inputReaderThread =
+ new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter);
+ errReaderThread =
+ new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter);
+
+ logAsyncService.execute(inputReaderThread);
+ logAsyncService.execute(errReaderThread);
+
+ int exitCode = process.waitFor();
+ counter.await();
+
+ completed.set(true);
+
+ if (exitCode != 0) {
+ return new ErrorExecuteResponse("run shell failed", new ShellCodeErrorException());
+ } else {
+ return new SuccessExecuteResponse();
+ }
+ } catch (Exception e) {
+ logger.error("Execute shell code failed, reason:", e);
+ return new ErrorExecuteResponse("run shell failed", e);
+ } finally {
+ if (errorsReader != null) {
+ errReaderThread.onDestroy();
+ }
+ if (inputReaderThread != null) {
+ inputReaderThread.onDestroy();
+ }
+ shellECTaskInfoCache.remove(taskId);
+ }
+ }
+
+ private boolean isExecutePathExist(String executePath) {
+ File etlHomeDir = new File(executePath);
+ return (etlHomeDir.exists() && etlHomeDir.isDirectory());
+ }
+
+ private String[] generateRunCode(String code) {
+ return new String[] {"sh", "-c", code};
+ }
+
+ private String[] generateRunCodeWithArgs(String code, String[] args) {
+ return new String[] {
+ "sh",
+ "-c",
+ "echo \"dummy " + StringUtils.join(args, " ") + "\" | xargs sh -c \'" + code + "\'"
+ };
+ }
+
+ @Override
+ public String getId() {
+ return Sender.getThisServiceInstance().getInstance() + "_" + id;
+ }
+
+ @Override
+ public JobProgressInfo[] getProgressInfo(String taskID) {
+ List<JobProgressInfo> jobProgressInfos = new ArrayList<>();
+ if (this.engineExecutionContext == null) {
+ return jobProgressInfos.toArray(new JobProgressInfo[0]);
+ }
+
+ String jobId =
+ engineExecutionContext.getJobId().isDefined()
+ ? engineExecutionContext.getJobId().get()
+ : "";
+ if (progress(taskID) == 0.0f) {
+ jobProgressInfos.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
+ } else {
+ jobProgressInfos.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
+ }
+ return jobProgressInfos.toArray(new JobProgressInfo[0]);
+ }
+
+ @Override
+ public float progress(String taskID) {
+ if (this.engineExecutionContext != null) {
+ return ((float) this.engineExecutionContext.getCurrentParagraph())
+ / this.engineExecutionContext.getTotalParagraph();
+ } else {
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public boolean supportCallBackLogs() {
+ // todo
+ return true;
+ }
+
+ @Override
+ public NodeResource requestExpectedResource(NodeResource expectedResource) {
+ return null;
+ }
+
+ @Override
+ public NodeResource getCurrentNodeResource() {
+ CommonNodeResource resource = new CommonNodeResource();
+ resource.setUsedResource(
+ NodeResourceUtils.applyAsLoadInstanceResource(
+ EngineConnObject.getEngineCreationContext().getOptions()));
+ return resource;
+ }
+
+ @Override
+ public List<Label<?>> getExecutorLabels() {
+ return executorLabels;
+ }
+
+ @Override
+ public void setExecutorLabels(List<Label<?>> labels) {
+ if (labels != null) {
+ executorLabels.clear();
+ executorLabels.addAll(labels);
+ }
+ }
+
+ @Override
+ public void killTask(String taskID) {
+ ShellECTaskInfo shellECTaskInfo = shellECTaskInfoCache.remove(taskID);
+ if (shellECTaskInfo == null) {
+ return;
+ }
+
+ /*
+ Kill sub-processes
+ */
+ int pid = getPid(shellECTaskInfo.getProcess());
+ GovernanceUtils.killProcess(String.valueOf(pid), "kill task " + taskID + " process", false);
+
+ /*
+ Kill yarn-applications
+ */
+ List<String> yarnAppIds = shellECTaskInfo.getYarnAppIdExtractor().getExtractedYarnAppIds();
+ GovernanceUtils.killYarnJobApp(yarnAppIds);
+ logger.info(
+ "Finished kill yarn app ids in the engine of ({}). The YARN app ids are {}.",
+ getId(),
+ yarnAppIds);
+ super.killTask(taskID);
+ }
+
+ private int getPid(Process process) {
+ try {
+ Class<?> clazz = Class.forName("java.lang.UNIXProcess");
+ Field field = clazz.getDeclaredField("pid");
+ field.setAccessible(true);
+ return field.getInt(process);
+ } catch (Exception e) {
+ logger.warn("Failed to acquire pid for shell process");
+ return -1;
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ killAll();
+ logAsyncService.shutdown();
+ } catch (Exception e) {
+ logger.error("Shell ec failed to close ");
+ }
+ super.close();
+ }
+
+ @Override
+ public void killAll() {
+ Iterator<ShellECTaskInfo> iterator = shellECTaskInfoCache.values().iterator();
+ while (iterator.hasNext()) {
+ ShellECTaskInfo shellECTaskInfo = iterator.next();
+ killTask(shellECTaskInfo.getTaskId());
+ }
+ }
+
+ @Override
+ public int getConcurrentLimit() {
+ return maxRunningNumber;
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
new file mode 100644
index 000000000..964417765
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.governance.common.utils.GovernanceUtils;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
+import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShellEngineConnExecutor extends ComputationExecutor {
+ private static final Logger logger = LoggerFactory.getLogger(ShellEngineConnExecutor.class);
+
+ private int id;
+ private EngineExecutionContext engineExecutionContext;
+ private List<Label<?>> executorLabels = new ArrayList<>();
+ private Process process;
+ private YarnAppIdExtractor extractor;
+
+ public ShellEngineConnExecutor(int id) {
+ super(id);
+ this.id = id;
+ }
+
+ @Override
+ public void init() {
+ logger.info("Ready to change engine state!");
+ super.init();
+ }
+
+ @Override
+ public ExecuteResponse executeCompletely(
+ EngineExecutionContext engineExecutionContext, String code, String completedLine) {
+ final String newcode = completedLine + code;
+ logger.debug("newcode is " + newcode);
+ return executeLine(engineExecutionContext, newcode);
+ }
+
+ @Override
+ public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String code) {
+ if (engineExecutionContext != null) {
+ this.engineExecutionContext = engineExecutionContext;
+ logger.info("Shell executor reset new engineExecutionContext!");
+ }
+
+ BufferedReader bufferedReader = null;
+ BufferedReader errorsReader = null;
+
+ AtomicBoolean completed = new AtomicBoolean(false);
+ ReaderThread errReaderThread = null;
+ ReaderThread inputReaderThread = null;
+
+ try {
+ engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
+
+ String[] argsArr = null;
+ if (engineExecutionContext.getTotalParagraph() == 1
+ && engineExecutionContext.getProperties() != null
+ && engineExecutionContext
+ .getProperties()
+ .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
+
+ ArrayList<String> argsList =
+ (ArrayList<String>)
+ engineExecutionContext
+ .getProperties()
+ .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
+
+ try {
+ argsArr = argsList.toArray(new String[argsList.size()]);
+ logger.info(
+ "Will execute shell task with user-specified arguments: '{}'",
+ Arrays.toString(argsArr));
+ } catch (Exception t) {
+ logger.warn(
+ "Cannot read user-input shell arguments. Will execute shell task without them.", t);
+ }
+ }
+
+ String workingDirectory = null;
+ if (engineExecutionContext.getTotalParagraph() == 1
+ && engineExecutionContext.getProperties() != null
+ && engineExecutionContext
+ .getProperties()
+ .containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
+
+ String wdStr =
+ (String)
+ engineExecutionContext
+ .getProperties()
+ .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
+
+ try {
+ if (isExecutePathExist(wdStr)) {
+ logger.info(
+ "Will execute shell task under user-specified working-directory: '" + wdStr + "'");
+ workingDirectory = wdStr;
+ } else {
+ logger.warn(
+ "User-specified working-directory: '"
+ + wdStr
+ + "' does not exist or user does not have access permission. "
+ + "Will execute shell task under default working-directory. Please contact the administrator!");
+ }
+ } catch (Exception t) {
+ logger.warn(
+ "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
+ t);
+ }
+ }
+
+ String[] generatedCode =
+ argsArr == null || argsArr.length == 0
+ ? generateRunCode(code)
+ : generateRunCodeWithArgs(code, argsArr);
+
+ ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
+ if (StringUtils.isNotBlank(workingDirectory)) {
+ processBuilder.directory(new File(workingDirectory));
+ }
+
+ processBuilder.redirectErrorStream(false);
+ extractor = new YarnAppIdExtractor();
+ process = processBuilder.start();
+
+ bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+ CountDownLatch counter = new CountDownLatch(2);
+ inputReaderThread =
+ new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter);
+ errReaderThread =
+ new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter);
+
+ inputReaderThread.start();
+ errReaderThread.start();
+
+ int exitCode = process.waitFor();
+ counter.await();
+
+ completed.set(true);
+
+ if (exitCode != 0) {
+ return new ErrorExecuteResponse("run shell failed", new ShellCodeErrorException());
+ } else {
+ return new SuccessExecuteResponse();
+ }
+
+ } catch (Exception e) {
+ logger.error("Execute shell code failed, reason:", e);
+ return new ErrorExecuteResponse("run shell failed", e);
+
+ } finally {
+ if (errorsReader != null) {
+ inputReaderThread.onDestroy();
+ }
+ if (inputReaderThread != null) {
+ errReaderThread.onDestroy();
+ }
+ IOUtils.closeQuietly(bufferedReader);
+ IOUtils.closeQuietly(errorsReader);
+ }
+ }
+
+ private boolean isExecutePathExist(String executePath) {
+ File etlHomeDir = new File(executePath);
+ return (etlHomeDir.exists() && etlHomeDir.isDirectory());
+ }
+
+ private String[] generateRunCode(String code) {
+ return new String[] {"sh", "-c", code};
+ }
+
+ private String[] generateRunCodeWithArgs(String code, String[] args) {
+ return new String[] {
+ "sh",
+ "-c",
+ "echo \"dummy " + StringUtils.join(args, " ") + "\" | xargs sh -c \'" + code + "\'"
+ };
+ }
+
+ @Override
+ public String getId() {
+ return Sender.getThisServiceInstance().getInstance() + "_" + id;
+ }
+
+ @Override
+ public JobProgressInfo[] getProgressInfo(String taskID) {
+ List<JobProgressInfo> jobProgressInfo = new ArrayList<>();
+ if (this.engineExecutionContext == null) {
+ return jobProgressInfo.toArray(new JobProgressInfo[0]);
+ }
+
+ String jobId =
+ engineExecutionContext.getJobId().isDefined()
+ ? engineExecutionContext.getJobId().get()
+ : "";
+ if (progress(taskID) == 0.0f) {
+ jobProgressInfo.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
+ } else {
+ jobProgressInfo.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
+ }
+ return jobProgressInfo.toArray(new JobProgressInfo[0]);
+ }
+
+ @Override
+ public float progress(String taskID) {
+ if (this.engineExecutionContext != null) {
+ return this.engineExecutionContext.getCurrentParagraph()
+ / (float) this.engineExecutionContext.getTotalParagraph();
+ } else {
+ return 0.0f;
+ }
+ }
+
+ @Override
+ public boolean supportCallBackLogs() {
+ // todo
+ return true;
+ }
+
+ @Override
+ public NodeResource requestExpectedResource(NodeResource expectedResource) {
+ return null;
+ }
+
+ @Override
+ public NodeResource getCurrentNodeResource() {
+ CommonNodeResource resource = new CommonNodeResource();
+ resource.setUsedResource(
+ NodeResourceUtils.applyAsLoadInstanceResource(
+ EngineConnObject.getEngineCreationContext().getOptions()));
+ return resource;
+ }
+
+ @Override
+ public List<Label<?>> getExecutorLabels() {
+ return executorLabels;
+ }
+
+ @Override
+ public void setExecutorLabels(List<Label<?>> labels) {
+ if (labels != null) {
+ executorLabels.clear();
+ executorLabels.addAll(labels);
+ }
+ }
+
+ @Override
+ public void killTask(String taskID) {
+
+ /*
+ Kill sub-processes
+ */
+ int pid = getPid(process);
+ GovernanceUtils.killProcess(String.valueOf(pid), "kill task " + taskID + " process", false);
+
+ /*
+ Kill yarn-applications
+ */
+ List<String> yarnAppIds = extractor.getExtractedYarnAppIds();
+ GovernanceUtils.killYarnJobApp(yarnAppIds);
+ logger.info("Finished kill yarn app ids in the engine of ({})", getId());
+ super.killTask(taskID);
+ }
+
+ private int getPid(Process process) {
+ try {
+ Class<?> clazz = Class.forName("java.lang.UNIXProcess");
+ Field field = clazz.getDeclaredField("pid");
+ field.setAccessible(true);
+ return field.getInt(process);
+ } catch (Exception e) {
+ logger.warn("Failed to acquire pid for shell process");
+ return -1;
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ process.destroy();
+ } catch (Exception e) {
+ logger.error("kill process " + process.toString() + " failed ", e);
+ } catch (Throwable t) {
+ logger.error("kill process " + process.toString() + " failed ", t);
+ }
+ super.close();
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.java
new file mode 100644
index 000000000..c138e79e6
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.*;
+import java.util.Collections;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class YarnAppIdExtractor {
+
+ private final Set<String> appIdList = Collections.synchronizedSet(new HashSet<>());
+
+ private final String regex =
+ EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+ private final Pattern pattern = Pattern.compile(regex);
+
+ public void appendLineToExtractor(String content) {
+ if (StringUtils.isBlank(content)) {
+ return;
+ }
+ Matcher yarnAppIDMatcher = pattern.matcher(content);
+ if (yarnAppIDMatcher.find()) {
+ String yarnAppID = yarnAppIDMatcher.group(2);
+ appIdList.add(yarnAppID);
+ }
+ }
+
+ public List<String> getExtractedYarnAppIds() {
+ synchronized (appIdList) {
+ return new ArrayList<>(appIdList);
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.scala
deleted file mode 100644
index 115b4763e..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell
-
-import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
-import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.common.resource.{
- EngineResourceFactory,
- GenericEngineResourceFactory
-}
-import org.apache.linkis.manager.engineplugin.shell.builder.ShellProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.shell.factory.ShellEngineConnFactory
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.EngineType
-import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
-
-import java.util
-
-class ShellEngineConnPlugin extends EngineConnPlugin {
-
- private val resourceLocker = new Object()
-
- private val engineLaunchBuilderLocker = new Object()
-
- private val engineFactoryLocker = new Object()
-
- private var engineResourceFactory: EngineResourceFactory = _
-
- private var engineLaunchBuilder: EngineConnLaunchBuilder = _
-
- private var engineFactory: EngineConnFactory = _
-
- private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
- override def init(params: util.Map[String, AnyRef]): Unit = {
- val engineTypeLabel = EngineTypeLabelCreator.createEngineTypeLabel(EngineType.SHELL.toString)
- this.defaultLabels.add(engineTypeLabel)
- }
-
- override def getEngineResourceFactory: EngineResourceFactory = {
- if (null == engineResourceFactory) resourceLocker synchronized {
- engineResourceFactory = new GenericEngineResourceFactory
- }
- engineResourceFactory
- }
-
- override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
- new ShellProcessEngineConnLaunchBuilder
- }
-
- override def getEngineConnFactory: EngineConnFactory = {
- if (null == engineFactory) engineFactoryLocker synchronized {
- engineFactory = new ShellEngineConnFactory
- }
- engineFactory
- }
-
- override def getDefaultLabels: util.List[Label[_]] = this.defaultLabels
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.scala
deleted file mode 100644
index e91c5923c..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.exception
-
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.manager.engineplugin.shell.errorcode.LinkisCommonsErrorCodeSummary._
-
-case class NoCorrectUserException()
- extends ErrorException(NO_ILLEGAL_USER_HOLDS.getErrorCode, NO_ILLEGAL_USER_HOLDS.getErrorDesc)
-
-case class ShellCodeErrorException()
- extends ErrorException(SHELL_CODE_IS_WRONG.getErrorCode, SHELL_CODE_IS_WRONG.getErrorDesc)
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
deleted file mode 100644
index 8277cb116..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-
-import java.io.BufferedReader
-import java.util
-import java.util.concurrent.CountDownLatch
-
-class ReaderThread extends Thread with Logging {
-
- private var engineExecutionContext: EngineExecutionContext = _
- private var inputReader: BufferedReader = _
- private var extractor: YarnAppIdExtractor = _
- private var isStdout: Boolean = false
- private val logListCount = CommonVars[Int]("wds.linkis.engineconn.log.list.count", 50)
- private var counter: CountDownLatch = _
-
- private var isReaderAlive = true
-
- def this(
- engineExecutionContext: EngineExecutionContext,
- inputReader: BufferedReader,
- extractor: YarnAppIdExtractor,
- isStdout: Boolean,
- counter: CountDownLatch
- ) {
- this()
- this.inputReader = inputReader
- this.engineExecutionContext = engineExecutionContext
- this.extractor = extractor
- this.isStdout = isStdout
- this.counter = counter
- }
-
- def onDestroy(): Unit = {
- isReaderAlive = false
- }
-
- def startReaderThread(): Unit = {
- Utils.tryCatch {
- this.start()
- } { t =>
- throw t
- }
- }
-
- override def run(): Unit = {
- Utils.tryCatch {
- var line: String = null
- val logArray: util.List[String] = new util.ArrayList[String]
- while ({ line = inputReader.readLine(); line != null && isReaderAlive }) {
- logger.info("read logger line :{}", line)
- logArray.add(line)
- extractor.appendLineToExtractor(line)
- if (isStdout) engineExecutionContext.appendTextResultSet(line)
- if (logArray.size > logListCount.getValue) {
- val linelist = StringUtils.join(logArray, "\n")
- engineExecutionContext.appendStdout(linelist)
- logArray.clear()
- }
- }
- if (logArray.size > 0) {
- val linelist = StringUtils.join(logArray, "\n")
- engineExecutionContext.appendStdout(linelist)
- logArray.clear()
- }
- } { t =>
- logger.warn("inputReader reading the input stream", t)
- }
- IOUtils.closeQuietly(inputReader)
- counter.countDown()
- }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
deleted file mode 100644
index b66353ca0..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.utils.{Logging, OverloadUtils, Utils}
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ConcurrentComputationExecutor,
- EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.governance.common.utils.GovernanceUtils
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadResource,
- NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
-import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf
-import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
-
-import org.apache.commons.lang3.StringUtils
-
-import java.io.{BufferedReader, File, InputStreamReader}
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContextExecutorService
-
-class ShellEngineConnConcurrentExecutor(id: Int, maxRunningNumber: Int)
- extends ConcurrentComputationExecutor
- with Logging {
-
- private var engineExecutionContext: EngineExecutionContext = _
-
- private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
- private val shellECTaskInfoCache: util.Map[String, ShellECTaskInfo] =
- new ConcurrentHashMap[String, ShellECTaskInfo]()
-
- private implicit val logAsyncService: ExecutionContextExecutorService =
- Utils.newCachedExecutionContext(
- ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE,
- "ShelLogService-Thread-"
- )
-
- override def init(): Unit = {
- logger.info(s"Ready to change engine state!")
- super.init
- }
-
- override def executeCompletely(
- engineExecutionContext: EngineExecutionContext,
- code: String,
- completedLine: String
- ): ExecuteResponse = {
- val newcode = completedLine + code
- logger.debug("newcode is " + newcode)
- executeLine(engineExecutionContext, newcode)
- }
-
- override def executeLine(
- engineExecutionContext: EngineExecutionContext,
- code: String
- ): ExecuteResponse = {
-
- if (null != engineExecutionContext) {
- this.engineExecutionContext = engineExecutionContext
- logger.info("Shell executor reset new engineExecutionContext!")
- }
-
- if (engineExecutionContext.getJobId.isEmpty) {
- return ErrorExecuteResponse("taskID is null", null)
- }
-
- val taskId = engineExecutionContext.getJobId.get
- var bufferedReader: BufferedReader = null
- var errorsReader: BufferedReader = null
-
- val completed = new AtomicBoolean(false)
- var errReaderThread: ReaderThread = null
- var inputReaderThread: ReaderThread = null
-
- try {
- engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")
-
- val argsArr =
- if (
- engineExecutionContext.getTotalParagraph == 1 &&
- engineExecutionContext.getProperties != null &&
- engineExecutionContext.getProperties.containsKey(
- ShellEngineConnPluginConst.RUNTIME_ARGS_KEY
- )
- ) {
- Utils.tryCatch {
- val argsList = engineExecutionContext.getProperties
- .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
- .asInstanceOf[util.ArrayList[String]]
- logger.info(
- "Will execute shell task with user-specified arguments: \'" + argsList
- .toArray(new Array[String](argsList.size()))
- .mkString("\' \'") + "\'"
- )
- argsList.toArray(new Array[String](argsList.size()))
- } { t =>
- logger.warn(
- "Cannot read user-input shell arguments. Will execute shell task without them.",
- t
- )
- null
- }
- } else {
- null
- }
-
- val workingDirectory =
- if (
- engineExecutionContext.getTotalParagraph == 1 &&
- engineExecutionContext.getProperties != null &&
- engineExecutionContext.getProperties.containsKey(
- ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
- )
- ) {
- Utils.tryCatch {
- val wdStr = engineExecutionContext.getProperties
- .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)
- .asInstanceOf[String]
- if (isExecutePathExist(wdStr)) {
- logger.info(
- "Will execute shell task under user-specified working-directory: \'" + wdStr
- )
- wdStr
- } else {
- logger.warn(
- "User-specified working-directory: \'" + wdStr + "\' does not exist or user does not have access permission. " +
- "Will execute shell task under default working-directory. Please contact the administrator!"
- )
- null
- }
- } { t =>
- logger.warn(
- "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
- t
- )
- null
- }
- } else {
- null
- }
-
- val generatedCode = if (argsArr == null || argsArr.isEmpty) {
- generateRunCode(code)
- } else {
- generateRunCodeWithArgs(code, argsArr)
- }
-
- val processBuilder: ProcessBuilder = new ProcessBuilder(generatedCode: _*)
- if (StringUtils.isNotBlank(workingDirectory)) {
- processBuilder.directory(new File(workingDirectory))
- }
-
- processBuilder.redirectErrorStream(false)
- val extractor = new YarnAppIdExtractor
- val process = processBuilder.start()
- bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
- errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
- // add task id and task Info cache
- shellECTaskInfoCache.put(taskId, ShellECTaskInfo(taskId, process, extractor))
-
- val counter: CountDownLatch = new CountDownLatch(2)
- inputReaderThread =
- new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
- errReaderThread =
- new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter)
-
- logAsyncService.execute(inputReaderThread)
- logAsyncService.execute(errReaderThread)
-
- val exitCode = process.waitFor()
- counter.await()
-
- completed.set(true)
-
- if (exitCode != 0) {
- ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
- } else SuccessExecuteResponse()
-
- } catch {
- case e: Exception =>
- logger.error("Execute shell code failed, reason:", e)
- ErrorExecuteResponse("run shell failed", e)
- } finally {
- if (null != errorsReader) {
- inputReaderThread.onDestroy()
- }
- if (null != inputReaderThread) {
- errReaderThread.onDestroy()
- }
- shellECTaskInfoCache.remove(taskId)
- }
- }
-
- private def isExecutePathExist(executePath: String): Boolean = {
- val etlHomeDir: File = new File(executePath)
- (etlHomeDir.exists() && etlHomeDir.isDirectory)
- }
-
- private def generateRunCode(code: String): Array[String] = {
- Array("sh", "-c", code)
- }
-
- private def generateRunCodeWithArgs(code: String, args: Array[String]): Array[String] = {
- Array("sh", "-c", "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'")
- }
-
- override def getId(): String = Sender.getThisServiceInstance.getInstance + "_" + id
-
- override def getProgressInfo(taskID: String): Array[JobProgressInfo] = {
- val jobProgressInfo = new ArrayBuffer[JobProgressInfo]()
- if (null == this.engineExecutionContext) {
- return jobProgressInfo.toArray
- }
- if (0.0f == progress(taskID)) {
- jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0)
- } else {
- jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 0, 0, 1)
- }
- jobProgressInfo.toArray
- }
-
- override def progress(taskID: String): Float = {
- if (null != this.engineExecutionContext) {
- this.engineExecutionContext.getCurrentParagraph / this.engineExecutionContext.getTotalParagraph
- .asInstanceOf[Float]
- } else {
- 0.0f
- }
- }
-
- override def supportCallBackLogs(): Boolean = {
- // todo
- true
- }
-
- override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
- null
- }
-
- override def getCurrentNodeResource(): NodeResource = {
- val resource = new CommonNodeResource
- resource.setUsedResource(
- NodeResourceUtils
- .applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
- )
- resource
- }
-
- override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
- override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
- if (null != labels) {
- executorLabels.clear()
- executorLabels.addAll(labels)
- }
- }
-
- override def killTask(taskID: String): Unit = {
- val shellECTaskInfo = shellECTaskInfoCache.remove(taskID)
- if (null == shellECTaskInfo) {
- return
- }
- /*
- Kill sub-processes
- */
- val pid = getPid(shellECTaskInfo.process)
- GovernanceUtils.killProcess(String.valueOf(pid), s"kill task $taskID process", false)
- /*
- Kill yarn-applications
- */
- val yarnAppIds = shellECTaskInfo.yarnAppIdExtractor.getExtractedYarnAppIds()
- GovernanceUtils.killYarnJobApp(yarnAppIds)
- logger.info(
- s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app ids are ${yarnAppIds}"
- )
- super.killTask(taskID)
-
- }
-
- private def getPid(process: Process): Int = {
- Utils.tryCatch {
- val clazz = Class.forName("java.lang.UNIXProcess");
- val field = clazz.getDeclaredField("pid");
- field.setAccessible(true);
- field.get(process).asInstanceOf[Int]
- } { t =>
- logger.warn("Failed to acquire pid for shell process")
- -1
- }
- }
-
- override def close(): Unit = {
- Utils.tryCatch {
- killAll()
- logAsyncService.shutdown()
- } { t: Throwable =>
- logger.error(s"Shell ec failed to close ", t)
- }
- super.close()
- }
-
- override def killAll(): Unit = {
- val iterator = shellECTaskInfoCache.values().iterator()
- while (iterator.hasNext) {
- val shellECTaskInfo = iterator.next()
- Utils.tryAndWarn(killTask(shellECTaskInfo.taskId))
- }
- }
-
- override def getConcurrentLimit: Int = {
- maxRunningNumber
- }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
deleted file mode 100644
index c9f320625..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.engineconn.computation.executor.execute.{
- ComputationExecutor,
- EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.governance.common.utils.GovernanceUtils
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadInstanceResource,
- NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
-import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
- ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
-}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-
-import java.io.{BufferedReader, File, FileReader, InputStreamReader, IOException}
-import java.util
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging {
-
- private var engineExecutionContext: EngineExecutionContext = _
-
- private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
- private var process: Process = _
-
- private var extractor: YarnAppIdExtractor = _
-
- override def init(): Unit = {
- logger.info(s"Ready to change engine state!")
- super.init
- }
-
- override def executeCompletely(
- engineExecutionContext: EngineExecutionContext,
- code: String,
- completedLine: String
- ): ExecuteResponse = {
- val newcode = completedLine + code
- logger.debug("newcode is " + newcode)
- executeLine(engineExecutionContext, newcode)
- }
-
- override def executeLine(
- engineExecutionContext: EngineExecutionContext,
- code: String
- ): ExecuteResponse = {
-
- if (null != engineExecutionContext) {
- this.engineExecutionContext = engineExecutionContext
- logger.info("Shell executor reset new engineExecutionContext!")
- }
-
- var bufferedReader: BufferedReader = null
- var errorsReader: BufferedReader = null
-
- val completed = new AtomicBoolean(false)
- var errReaderThread: ReaderThread = null
- var inputReaderThread: ReaderThread = null
-
- try {
- engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")
-
- val argsArr =
- if (
- engineExecutionContext.getTotalParagraph == 1 &&
- engineExecutionContext.getProperties != null &&
- engineExecutionContext.getProperties.containsKey(
- ShellEngineConnPluginConst.RUNTIME_ARGS_KEY
- )
- ) {
- Utils.tryCatch {
- val argsList = engineExecutionContext.getProperties
- .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
- .asInstanceOf[util.ArrayList[String]]
- logger.info(
- "Will execute shell task with user-specified arguments: \'" + argsList
- .toArray(new Array[String](argsList.size()))
- .mkString("\' \'") + "\'"
- )
- argsList.toArray(new Array[String](argsList.size()))
- } { t =>
- logger.warn(
- "Cannot read user-input shell arguments. Will execute shell task without them.",
- t
- )
- null
- }
- } else {
- null
- }
-
- val workingDirectory =
- if (
- engineExecutionContext.getTotalParagraph == 1 &&
- engineExecutionContext.getProperties != null &&
- engineExecutionContext.getProperties.containsKey(
- ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
- )
- ) {
- Utils.tryCatch {
- val wdStr = engineExecutionContext.getProperties
- .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)
- .asInstanceOf[String]
- if (isExecutePathExist(wdStr)) {
- logger.info(
- "Will execute shell task under user-specified working-directory: \'" + wdStr
- )
- wdStr
- } else {
- logger.warn(
- "User-specified working-directory: \'" + wdStr + "\' does not exist or user does not have access permission. " +
- "Will execute shell task under default working-directory. Please contact the administrator!"
- )
- null
- }
- } { t =>
- logger.warn(
- "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
- t
- )
- null
- }
- } else {
- null
- }
-
- val generatedCode = if (argsArr == null || argsArr.isEmpty) {
- generateRunCode(code)
- } else {
- generateRunCodeWithArgs(code, argsArr)
- }
-
- val processBuilder: ProcessBuilder = new ProcessBuilder(generatedCode: _*)
- if (StringUtils.isNotBlank(workingDirectory)) {
- processBuilder.directory(new File(workingDirectory))
- }
-
- processBuilder.redirectErrorStream(false)
- extractor = new YarnAppIdExtractor
- process = processBuilder.start()
-
- bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
- errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
- val counter: CountDownLatch = new CountDownLatch(2)
- inputReaderThread =
- new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
- errReaderThread =
- new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter)
-
- inputReaderThread.start()
- errReaderThread.start()
-
- val exitCode = process.waitFor()
- counter.await()
-
- completed.set(true)
-
- if (exitCode != 0) {
- ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
- } else SuccessExecuteResponse()
- } catch {
- case e: Exception =>
- logger.error("Execute shell code failed, reason:", e)
- ErrorExecuteResponse("run shell failed", e)
- } finally {
- if (null != errorsReader) {
- inputReaderThread.onDestroy()
- }
- if (null != inputReaderThread) {
- errReaderThread.onDestroy()
- }
- IOUtils.closeQuietly(bufferedReader)
- IOUtils.closeQuietly(errorsReader)
- }
- }
-
- private def isExecutePathExist(executePath: String): Boolean = {
- val etlHomeDir: File = new File(executePath)
- (etlHomeDir.exists() && etlHomeDir.isDirectory())
- }
-
- private def generateRunCode(code: String): Array[String] = {
- Array("sh", "-c", code)
- }
-
- private def generateRunCodeWithArgs(code: String, args: Array[String]): Array[String] = {
- Array("sh", "-c", "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'")
- }
-
- override def getId(): String = Sender.getThisServiceInstance.getInstance + "_" + id
-
- override def getProgressInfo(taskID: String): Array[JobProgressInfo] = {
- val jobProgressInfo = new ArrayBuffer[JobProgressInfo]()
- if (null == this.engineExecutionContext) {
- return jobProgressInfo.toArray
- }
- if (0.0f == progress(taskID)) {
- jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0)
- } else {
- jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 0, 0, 1)
- }
- jobProgressInfo.toArray
- }
-
- override def progress(taskID: String): Float = {
- if (null != this.engineExecutionContext) {
- this.engineExecutionContext.getCurrentParagraph / this.engineExecutionContext.getTotalParagraph
- .asInstanceOf[Float]
- } else {
- 0.0f
- }
- }
-
- override def supportCallBackLogs(): Boolean = {
- // todo
- true
- }
-
- override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
- null
- }
-
- override def getCurrentNodeResource(): NodeResource = {
- val resource = new CommonNodeResource
- resource.setUsedResource(
- NodeResourceUtils
- .applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
- )
- resource
- }
-
- override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
- override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
- if (null != labels) {
- executorLabels.clear()
- executorLabels.addAll(labels)
- }
- }
-
- override def killTask(taskID: String): Unit = {
- /*
- Kill sub-processes
- */
- val pid = getPid(process)
- GovernanceUtils.killProcess(String.valueOf(pid), s"kill task $taskID process", false)
- /*
- Kill yarn-applications
- */
- val yarnAppIds = extractor.getExtractedYarnAppIds()
- GovernanceUtils.killYarnJobApp(yarnAppIds)
- logger.info(s"Finished kill yarn app ids in the engine of (${getId()}).}")
- super.killTask(taskID)
-
- }
-
- private def getPid(process: Process): Int = {
- Utils.tryCatch {
- val clazz = Class.forName("java.lang.UNIXProcess");
- val field = clazz.getDeclaredField("pid");
- field.setAccessible(true);
- field.get(process).asInstanceOf[Int]
- } { t =>
- logger.warn("Failed to acquire pid for shell process")
- -1
- }
- }
-
- override def close(): Unit = {
- try {
- process.destroy()
- } catch {
- case e: Exception =>
- logger.error(s"kill process ${process.toString} failed ", e)
- case t: Throwable =>
- logger.error(s"kill process ${process.toString} failed ", t)
- }
- super.close()
- }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
deleted file mode 100644
index 2f5f79663..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineconn.common.conf.EngineConnConf
-
-import org.apache.commons.lang3.StringUtils
-
-import java.io._
-import java.util
-import java.util.Collections
-import java.util.regex.Pattern
-
-class YarnAppIdExtractor extends Logging {
-
- private val appIdList: util.Set[String] = Collections.synchronizedSet(new util.HashSet[String]())
-
- private val regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
- private val pattern = Pattern.compile(regex)
-
- def appendLineToExtractor(content: String): Unit = {
- if (StringUtils.isBlank(content)) return
- val yarnAppIDMatcher = pattern.matcher(content)
- if (yarnAppIDMatcher.find) {
- val yarnAppID = yarnAppIDMatcher.group(2)
- appIdList.add(yarnAppID)
- }
- }
-
- private def doExtractYarnAppId(content: String): Array[String] = {
-
- if (StringUtils.isBlank(content)) return new Array[String](0)
- // spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
- // sqoop, importtsv: Submitted application application_1609166102854_970911
-
- val stringReader = new StringReader(content)
-
- val reader = new BufferedReader(stringReader)
-
- val ret = new util.ArrayList[String]
-
- var line = reader.readLine()
- while ({
- line != null
- }) { // match application_xxx_xxx
- val mApp = pattern.matcher(line)
- if (mApp.find) {
- val candidate1 = mApp.group(2)
- if (!ret.contains(candidate1)) {
- ret.add(candidate1)
- }
- }
- line = reader.readLine
- }
- stringReader.close()
- ret.toArray(new Array[String](ret.size()))
- }
-
- def getExtractedYarnAppIds(): util.List[String] = {
- appIdList.synchronized {
- new util.ArrayList[String](appIdList)
- }
- }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
old mode 100644
new mode 100755
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.scala b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.java
similarity index 75%
rename from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.scala
rename to linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.java
index 228e5312f..34ce7a671 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.scala
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.java
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell
+package org.apache.linkis.manager.engineplugin.shell;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-class TestShellEngineConnPlugin {
+public class TestShellEngineConnPlugin {
@Test
- def testShellEngineConnExecutor: Unit = {
- val shellEngineConnPlugin = new ShellEngineConnPlugin
- Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnFactory)
- Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnLaunchBuilder)
- Assertions.assertNotNull(shellEngineConnPlugin.getEngineResourceFactory)
- Assertions.assertNotNull(shellEngineConnPlugin.getDefaultLabels)
+ public void testShellEngineConnExecutor() {
+ ShellEngineConnPlugin shellEngineConnPlugin = new ShellEngineConnPlugin();
+ Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnFactory());
+ Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnLaunchBuilder());
+ Assertions.assertNotNull(shellEngineConnPlugin.getEngineResourceFactory());
+ Assertions.assertNotNull(shellEngineConnPlugin.getDefaultLabels());
}
-
}
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.scala b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.java
similarity index 70%
rename from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.scala
rename to linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.java
index 59f00e8ff..36444cf44 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.scala
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.java
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.common
+package org.apache.linkis.manager.engineplugin.shell.common;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-class TestShellEngineConnPluginConst {
+public class TestShellEngineConnPluginConst {
@Test
- def testShellEngineConnPluginConst: Unit = {
- Assertions.assertEquals("extraArguments", ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
+ public void testShellEngineConnPluginConst() {
+ Assertions.assertEquals("extraArguments", ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
Assertions.assertEquals(
- "wds.linkis.shell.runtime.working.directory",
- ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
- )
+ "wds.linkis.shell.runtime.working.directory",
+ ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
}
-
}
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.java
similarity index 67%
rename from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
rename to linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.java
index 50ba61bb0..c3cc8e682 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.java
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.engineplugin.shell.exception
+package org.apache.linkis.manager.engineplugin.shell.exception;
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-class TestNoCorrectUserException {
+public class TestNoCorrectUserException {
@Test
- def testNoCorrectUserException: Unit = {
-
- val exception = NoCorrectUserException
- Assertions.assertNotNull(exception)
+ public void testNoCorrectUserException() {
+ Assertions.assertNotNull(new NoCorrectUserException());
}
@Test
- def testShellCodeErrorException: Unit = {
-
- val exception = ShellCodeErrorException
- Assertions.assertNotNull(exception)
+ public void testShellCodeErrorException() {
+ Assertions.assertNotNull(new ShellCodeErrorException());
}
-
}
diff --git a/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
new file mode 100644
index 000000000..d327282b2
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.DWCArgumentsParser;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+
+import scala.collection.mutable.HashMap;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestShellEngineConnExecutor {
+
+ @Test
+ public void testShellEngineConnExecutor() throws ReflectiveOperationException {
+ boolean isWindows = System.getProperty("os.name").startsWith("Windows");
+
+ System.setProperty("wds.linkis.server.version", "v1");
+ System.setProperty("HADOOP_CONF_DIR", "./");
+ System.setProperty(
+ "wds.linkis.engineconn.plugin.default.class",
+ "org.apache.linkis.manager.engineplugin.shell.ShellEngineConnPlugin");
+
+ HashMap<String, String> map = new HashMap<>();
+ map.put("spring.mvc.servlet.path", "/api/rest_j/v1");
+ map.put("server.port", "26380");
+ map.put("spring.application.name", "shellEngineExecutor");
+ map.put("eureka.client.register-with-eureka", "false");
+ map.put("eureka.client.fetch-registry", "false");
+ DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(map.toMap(null)));
+ ShellEngineConnExecutor shellEngineConnExecutor = new ShellEngineConnExecutor(1);
+ shellEngineConnExecutor.init();
+ Assertions.assertTrue(shellEngineConnExecutor.isEngineInitialized());
+ if (!isWindows) {
+ EngineExecutionContext engineExecutionContext =
+ new EngineExecutionContext(shellEngineConnExecutor, Utils.getJvmUser());
+ ExecuteResponse response = shellEngineConnExecutor.executeLine(engineExecutionContext, "id");
+ Assertions.assertNotNull(response);
+ shellEngineConnExecutor.close();
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.scala
deleted file mode 100644
index 833db129f..000000000
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.DataWorkCloudApplication
-import org.apache.linkis.common.conf.DWCArgumentsParser
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
-
-import scala.collection.mutable
-
-import org.junit.jupiter.api.{Assertions, Test}
-
-class TestShellEngineConnExecutor {
-
- @Test
- def testShellEngineConnExecutor: Unit = {
- val isWindows = System.getProperty("os.name").startsWith("Windows")
-
- System.setProperty("wds.linkis.server.version", "v1")
- System.setProperty("HADOOP_CONF_DIR", "./")
- System.setProperty(
- "wds.linkis.engineconn.plugin.default.class",
- "org.apache.linkis.manager.engineplugin.shell.ShellEngineConnPlugin"
- )
- val map = new mutable.HashMap[String, String]()
- map.put("spring.mvc.servlet.path", "/api/rest_j/v1")
- map.put("server.port", "26380")
- map.put("spring.application.name", "shellEngineExecutor")
- map.put("eureka.client.register-with-eureka", "false")
- map.put("eureka.client.fetch-registry", "false")
- DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(map.toMap))
- val shellEngineConnExecutor = new ShellEngineConnExecutor(1)
- shellEngineConnExecutor.init()
- Assertions.assertTrue(shellEngineConnExecutor.isEngineInitialized)
- if (!isWindows) {
-
- val engineExecutionContext =
- new EngineExecutionContext(shellEngineConnExecutor, Utils.getJvmUser)
- val response = shellEngineConnExecutor.executeLine(engineExecutionContext, "id")
- Assertions.assertNotNull(response)
- shellEngineConnExecutor.close()
- }
-
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org