You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2023/04/27 13:37:02 UTC

[linkis] branch dev-1.4.0 updated: Translate engineconn-plugins-shell service classes from Scala to Java (#4473)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
     new db37eed39 Translate engineconn-plugins-shell  service classes from Scala to Java (#4473)
db37eed39 is described below

commit db37eed39c3770c580e9c1a5d649c0c94064e5d2
Author: ChengJie1053 <18...@163.com>
AuthorDate: Thu Apr 27 21:36:53 2023 +0800

    Translate engineconn-plugins-shell  service classes from Scala to Java (#4473)
    
    * Remove useless code
---
 .../engineplugin/shell/ShellEngineConnPlugin.java  |  81 +++++
 .../ShellProcessEngineConnLaunchBuilder.java}      |   6 +-
 .../shell/common/ShellEngineConnPluginConst.java}  |   9 +-
 .../shell/conf/ShellEngineConnConf.java}           |  24 +-
 .../shell/exception/NoCorrectUserException.java}   |  12 +-
 .../shell/exception/ShellCodeErrorException.java}  |  16 +-
 .../engineplugin/shell/executor/ReaderThread.java  |  97 ++++++
 .../shell/executor/ShellECTaskInfo.java            |  54 ++++
 .../ShellEngineConnConcurrentExecutor.java         | 358 +++++++++++++++++++++
 .../shell/executor/ShellEngineConnExecutor.java    | 330 +++++++++++++++++++
 .../shell/executor/YarnAppIdExtractor.java         |  53 +++
 .../engineplugin/shell/ShellEngineConnPlugin.scala |  76 -----
 .../shell/exception/NoCorrectUserException.scala   |  27 --
 .../engineplugin/shell/executor/ReaderThread.scala |  96 ------
 .../ShellEngineConnConcurrentExecutor.scala        | 348 --------------------
 .../shell/executor/ShellEngineConnExecutor.scala   | 319 ------------------
 .../shell/executor/YarnAppIdExtractor.scala        |  81 -----
 .../shell/factory/ShellEngineConnFactory.scala     |   0
 .../shell/TestShellEngineConnPlugin.java}          |  20 +-
 .../common/TestShellEngineConnPluginConst.java}    |  17 +-
 .../exception/TestNoCorrectUserException.java}     |  20 +-
 .../executor/TestShellEngineConnExecutor.java      |  61 ++++
 .../executor/TestShellEngineConnExecutor.scala     |  62 ----
 23 files changed, 1092 insertions(+), 1075 deletions(-)

diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.java
new file mode 100644
index 000000000..704f1cd6e
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell;
+
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.shell.builder.ShellProcessEngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.shell.factory.ShellEngineConnFactory;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.manager.label.entity.engine.EngineType;
+import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ShellEngineConnPlugin implements EngineConnPlugin {
+  private Object resourceLocker = new Object();
+  private Object engineFactoryLocker = new Object();
+  private volatile EngineResourceFactory engineResourceFactory;
+  private volatile EngineConnFactory engineFactory;
+  private List<Label<?>> defaultLabels = new ArrayList<>();
+
+  public void init(Map<String, Object> params) {
+    Label<?> engineTypeLabel =
+        EngineTypeLabelCreator.createEngineTypeLabel(EngineType.SHELL().toString());
+    this.defaultLabels.add(engineTypeLabel);
+  }
+
+  @Override
+  public EngineResourceFactory getEngineResourceFactory() {
+    if (engineResourceFactory == null) {
+      synchronized (resourceLocker) {
+        if (engineResourceFactory == null) {
+          engineResourceFactory = new GenericEngineResourceFactory();
+        }
+      }
+    }
+    return engineResourceFactory;
+  }
+
+  @Override
+  public EngineConnLaunchBuilder getEngineConnLaunchBuilder() {
+    return new ShellProcessEngineConnLaunchBuilder();
+  }
+
+  @Override
+  public EngineConnFactory getEngineConnFactory() {
+    if (engineFactory == null) {
+      synchronized (engineFactoryLocker) {
+        if (engineFactory == null) {
+          engineFactory = new ShellEngineConnFactory();
+        }
+      }
+    }
+    return engineFactory;
+  }
+
+  @Override
+  public List<Label<?>> getDefaultLabels() {
+    return this.defaultLabels;
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.java
similarity index 81%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.java
index 23668f32a..c030db4e2 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/builder/ShellProcessEngineConnLaunchBuilder.java
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.builder
+package org.apache.linkis.manager.engineplugin.shell.builder;
 
-import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder
+import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder;
 
-class ShellProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {}
+public class ShellProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/common/ShellEnginePluginConst.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/common/ShellEngineConnPluginConst.java
similarity index 73%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/common/ShellEnginePluginConst.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/common/ShellEngineConnPluginConst.java
index 34771e68b..30fd9cf68 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/common/ShellEnginePluginConst.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/common/ShellEngineConnPluginConst.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.common
+package org.apache.linkis.manager.engineplugin.shell.common;
 
-object ShellEngineConnPluginConst {
-  final val RUNTIME_ARGS_KEY: String = "extraArguments"
-  final val SHELL_RUNTIME_WORKING_DIRECTORY: String = "wds.linkis.shell.runtime.working.directory"
+public class ShellEngineConnPluginConst {
+  public static final String RUNTIME_ARGS_KEY = "extraArguments";
+  public static final String SHELL_RUNTIME_WORKING_DIRECTORY =
+      "wds.linkis.shell.runtime.working.directory";
 }
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
similarity index 65%
copy from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
copy to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
index 50ba61bb0..3849bc1ee 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.java
@@ -15,24 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.exception
+package org.apache.linkis.manager.engineplugin.shell.conf;
 
-import org.junit.jupiter.api.{Assertions, Test}
+import org.apache.linkis.common.conf.CommonVars;
 
-class TestNoCorrectUserException {
-
-  @Test
-  def testNoCorrectUserException: Unit = {
-
-    val exception = NoCorrectUserException
-    Assertions.assertNotNull(exception)
-  }
-
-  @Test
-  def testShellCodeErrorException: Unit = {
-
-    val exception = ShellCodeErrorException
-    Assertions.assertNotNull(exception)
-  }
+public class ShellEngineConnConf {
+  public static final int SHELL_ENGINECONN_CONCURRENT_LIMIT =
+      CommonVars.apply("linkis.engineconn.shell.concurrent.limit", 30).getValue();
 
+  public static final int LOG_SERVICE_MAX_THREAD_SIZE =
+      CommonVars.apply("linkis.engineconn.shell.log.max.thread.size", 50).getValue();
 }
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.java
similarity index 65%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.java
index d8d7edaa3..6c5d106f6 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.java
@@ -15,6 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.executor
+package org.apache.linkis.manager.engineplugin.shell.exception;
 
-case class ShellECTaskInfo(taskId: String, process: Process, yarnAppIdExtractor: YarnAppIdExtractor)
+import org.apache.linkis.common.exception.ErrorException;
+
+import static org.apache.linkis.manager.engineplugin.shell.errorcode.LinkisCommonsErrorCodeSummary.*;
+
+public class NoCorrectUserException extends ErrorException {
+  public NoCorrectUserException() {
+    super(NO_ILLEGAL_USER_HOLDS.getErrorCode(), NO_ILLEGAL_USER_HOLDS.getErrorDesc());
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/ShellCodeErrorException.java
similarity index 66%
rename from linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
rename to linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/ShellCodeErrorException.java
index ba74dbbad..49b41ad03 100644
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/conf/ShellEngineConnConf.scala
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/exception/ShellCodeErrorException.java
@@ -15,16 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.conf
+package org.apache.linkis.manager.engineplugin.shell.exception;
 
-import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.exception.ErrorException;
 
-object ShellEngineConnConf {
-
-  val SHELL_ENGINECONN_CONCURRENT_LIMIT: Int =
-    CommonVars[Int]("linkis.engineconn.shell.concurrent.limit", 30).getValue
-
-  val LOG_SERVICE_MAX_THREAD_SIZE: Int =
-    CommonVars("linkis.engineconn.shell.log.max.thread.size", 50).getValue
+import static org.apache.linkis.manager.engineplugin.shell.errorcode.LinkisCommonsErrorCodeSummary.*;
 
+public class ShellCodeErrorException extends ErrorException {
+  public ShellCodeErrorException() {
+    super(SHELL_CODE_IS_WRONG.getErrorCode(), SHELL_CODE_IS_WRONG.getErrorDesc());
+  }
 }
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.java
new file mode 100644
index 000000000..8b3e7cad5
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.common.conf.CommonVars;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReaderThread extends Thread {
+  private static final Logger logger = LoggerFactory.getLogger(ReaderThread.class);
+
+  private EngineExecutionContext engineExecutionContext;
+  private BufferedReader inputReader;
+  private YarnAppIdExtractor extractor;
+  private boolean isStdout;
+  private final int logListCount =
+      CommonVars.apply("wds.linkis.engineconn.log.list.count", 50).getValue();
+  private CountDownLatch counter;
+
+  private boolean isReaderAlive = true;
+
+  public ReaderThread(
+      EngineExecutionContext engineExecutionContext,
+      BufferedReader inputReader,
+      YarnAppIdExtractor extractor,
+      boolean isStdout,
+      CountDownLatch counter) {
+    this.engineExecutionContext = engineExecutionContext;
+    this.inputReader = inputReader;
+    this.extractor = extractor;
+    this.isStdout = isStdout;
+    this.counter = counter;
+  }
+
+  public void onDestroy() {
+    isReaderAlive = false;
+  }
+
+  @Override
+  public void run() {
+    String line = null;
+    List<String> logArray = new ArrayList<>();
+    while (true) {
+      try {
+        line = inputReader.readLine();
+        if (!(line != null && isReaderAlive)) break;
+      } catch (IOException e) {
+        logger.warn("inputReader reading the input stream");
+        break;
+      }
+      logger.info("read logger line :{}", line);
+      logArray.add(line);
+      extractor.appendLineToExtractor(line);
+      if (isStdout) {
+        engineExecutionContext.appendTextResultSet(line);
+      }
+      if (logArray.size() > logListCount) {
+        String linelist = StringUtils.join(logArray, "\n");
+        engineExecutionContext.appendStdout(linelist);
+        logArray.clear();
+      }
+    }
+    if (logArray.size() > 0) {
+      String linelist = StringUtils.join(logArray, "\n");
+      engineExecutionContext.appendStdout(linelist);
+      logArray.clear();
+    }
+    IOUtils.closeQuietly(inputReader);
+    counter.countDown();
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.java
new file mode 100644
index 000000000..dc843327d
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellECTaskInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+public class ShellECTaskInfo {
+  private String taskId;
+  private Process process;
+  private YarnAppIdExtractor yarnAppIdExtractor;
+
+  public ShellECTaskInfo(String taskId, Process process, YarnAppIdExtractor yarnAppIdExtractor) {
+    this.taskId = taskId;
+    this.process = process;
+    this.yarnAppIdExtractor = yarnAppIdExtractor;
+  }
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  public Process getProcess() {
+    return process;
+  }
+
+  public void setProcess(Process process) {
+    this.process = process;
+  }
+
+  public YarnAppIdExtractor getYarnAppIdExtractor() {
+    return yarnAppIdExtractor;
+  }
+
+  public void setYarnAppIdExtractor(YarnAppIdExtractor yarnAppIdExtractor) {
+    this.yarnAppIdExtractor = yarnAppIdExtractor;
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
new file mode 100644
index 000000000..3f445ddd5
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.governance.common.utils.GovernanceUtils;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
+import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf;
+import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.concurrent.ExecutionContextExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShellEngineConnConcurrentExecutor extends ConcurrentComputationExecutor {
+  private static final Logger logger =
+      LoggerFactory.getLogger(ShellEngineConnConcurrentExecutor.class);
+
+  private EngineExecutionContext engineExecutionContext;
+
+  private List<Label<?>> executorLabels = new ArrayList<>();
+
+  private Map<String, ShellECTaskInfo> shellECTaskInfoCache = new ConcurrentHashMap<>();
+
+  private int id;
+  private int maxRunningNumber;
+
+  public ShellEngineConnConcurrentExecutor(int id, int maxRunningNumber) {
+    super(maxRunningNumber);
+    this.id = id;
+    this.maxRunningNumber = maxRunningNumber;
+  }
+
+  private final ExecutionContextExecutorService logAsyncService =
+      Utils.newCachedExecutionContext(
+          ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE, "ShelLogService-Thread-", true);
+
+  @Override
+  public void init() {
+    logger.info("Ready to change engine state!");
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse executeCompletely(
+      EngineExecutionContext engineExecutionContext, String code, String completedLine) {
+    String newcode = completedLine + code;
+    logger.debug("newcode is " + newcode);
+    return executeLine(engineExecutionContext, newcode);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String code) {
+    if (engineExecutionContext != null) {
+      this.engineExecutionContext = engineExecutionContext;
+      logger.info("Shell executor reset new engineExecutionContext!");
+    }
+
+    if (engineExecutionContext.getJobId().isEmpty()) {
+      return new ErrorExecuteResponse("taskID is null", null);
+    }
+
+    String taskId = engineExecutionContext.getJobId().get();
+    BufferedReader bufferedReader = null;
+    BufferedReader errorsReader = null;
+
+    AtomicBoolean completed = new AtomicBoolean(false);
+    ReaderThread errReaderThread = null;
+    ReaderThread inputReaderThread = null;
+
+    try {
+      engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
+
+      String[] argsArr;
+      if (engineExecutionContext.getTotalParagraph() == 1
+          && engineExecutionContext.getProperties() != null
+          && engineExecutionContext
+              .getProperties()
+              .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
+        ArrayList<String> argsList =
+            (ArrayList<String>)
+                engineExecutionContext
+                    .getProperties()
+                    .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
+        argsArr = argsList.toArray(new String[argsList.size()]);
+        logger.info(
+            "Will execute shell task with user-specified arguments: '{}'",
+            StringUtils.join(argsArr, "' '"));
+      } else {
+        argsArr = null;
+      }
+
+      String workingDirectory;
+      if (engineExecutionContext.getTotalParagraph() == 1
+          && engineExecutionContext.getProperties() != null
+          && engineExecutionContext
+              .getProperties()
+              .containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
+        String wdStr =
+            (String)
+                engineExecutionContext
+                    .getProperties()
+                    .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
+        if (isExecutePathExist(wdStr)) {
+          logger.info(
+              "Will execute shell task under user-specified working-directory: '{}'", wdStr);
+          workingDirectory = wdStr;
+        } else {
+          logger.warn(
+              "User-specified working-directory: '{}' does not exist or user does not have access permission. Will execute shell task under default working-directory. Please contact the administrator!",
+              wdStr);
+          workingDirectory = null;
+        }
+      } else {
+        workingDirectory = null;
+      }
+
+      String[] generatedCode =
+          argsArr == null || argsArr.length == 0
+              ? generateRunCode(code)
+              : generateRunCodeWithArgs(code, argsArr);
+
+      ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
+
+      if (StringUtils.isNotBlank(workingDirectory)) {
+        processBuilder.directory(new File(workingDirectory));
+      }
+
+      processBuilder.redirectErrorStream(false);
+      YarnAppIdExtractor extractor = new YarnAppIdExtractor();
+      Process process = processBuilder.start();
+      bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+      errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+
+      // add task id and task Info cache
+      shellECTaskInfoCache.put(taskId, new ShellECTaskInfo(taskId, process, extractor));
+
+      CountDownLatch counter = new CountDownLatch(2);
+      inputReaderThread =
+          new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter);
+      errReaderThread =
+          new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter);
+
+      logAsyncService.execute(inputReaderThread);
+      logAsyncService.execute(errReaderThread);
+
+      int exitCode = process.waitFor();
+      counter.await();
+
+      completed.set(true);
+
+      if (exitCode != 0) {
+        return new ErrorExecuteResponse("run shell failed", new ShellCodeErrorException());
+      } else {
+        return new SuccessExecuteResponse();
+      }
+    } catch (Exception e) {
+      logger.error("Execute shell code failed, reason:", e);
+      return new ErrorExecuteResponse("run shell failed", e);
+    } finally {
+      if (errorsReader != null) {
+        errReaderThread.onDestroy();
+      }
+      if (inputReaderThread != null) {
+        inputReaderThread.onDestroy();
+      }
+      shellECTaskInfoCache.remove(taskId);
+    }
+  }
+
+  private boolean isExecutePathExist(String executePath) {
+    File etlHomeDir = new File(executePath);
+    return (etlHomeDir.exists() && etlHomeDir.isDirectory());
+  }
+
+  private String[] generateRunCode(String code) {
+    return new String[] {"sh", "-c", code};
+  }
+
+  private String[] generateRunCodeWithArgs(String code, String[] args) {
+    return new String[] {
+      "sh",
+      "-c",
+      "echo \"dummy " + StringUtils.join(args, " ") + "\" | xargs sh -c \'" + code + "\'"
+    };
+  }
+
+  @Override
+  public String getId() {
+    return Sender.getThisServiceInstance().getInstance() + "_" + id;
+  }
+
+  @Override
+  public JobProgressInfo[] getProgressInfo(String taskID) {
+    List<JobProgressInfo> jobProgressInfos = new ArrayList<>();
+    if (this.engineExecutionContext == null) {
+      return jobProgressInfos.toArray(new JobProgressInfo[0]);
+    }
+
+    String jobId =
+        engineExecutionContext.getJobId().isDefined()
+            ? engineExecutionContext.getJobId().get()
+            : "";
+    if (progress(taskID) == 0.0f) {
+      jobProgressInfos.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
+    } else {
+      jobProgressInfos.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
+    }
+    return jobProgressInfos.toArray(new JobProgressInfo[0]);
+  }
+
+  @Override
+  public float progress(String taskID) {
+    if (this.engineExecutionContext != null) {
+      return ((float) this.engineExecutionContext.getCurrentParagraph())
+          / this.engineExecutionContext.getTotalParagraph();
+    } else {
+      return 0.0f;
+    }
+  }
+
+  @Override
+  public boolean supportCallBackLogs() {
+    // todo
+    return true;
+  }
+
+  @Override
+  public NodeResource requestExpectedResource(NodeResource expectedResource) {
+    return null;
+  }
+
+  @Override
+  public NodeResource getCurrentNodeResource() {
+    CommonNodeResource resource = new CommonNodeResource();
+    resource.setUsedResource(
+        NodeResourceUtils.applyAsLoadInstanceResource(
+            EngineConnObject.getEngineCreationContext().getOptions()));
+    return resource;
+  }
+
+  @Override
+  public List<Label<?>> getExecutorLabels() {
+    return executorLabels;
+  }
+
+  @Override
+  public void setExecutorLabels(List<Label<?>> labels) {
+    if (labels != null) {
+      executorLabels.clear();
+      executorLabels.addAll(labels);
+    }
+  }
+
+  @Override
+  public void killTask(String taskID) {
+    ShellECTaskInfo shellECTaskInfo = shellECTaskInfoCache.remove(taskID);
+    if (shellECTaskInfo == null) {
+      return;
+    }
+
+    /*
+     Kill sub-processes
+    */
+    int pid = getPid(shellECTaskInfo.getProcess());
+    GovernanceUtils.killProcess(String.valueOf(pid), "kill task " + taskID + " process", false);
+
+    /*
+     Kill yarn-applications
+    */
+    List<String> yarnAppIds = shellECTaskInfo.getYarnAppIdExtractor().getExtractedYarnAppIds();
+    GovernanceUtils.killYarnJobApp(yarnAppIds);
+    logger.info(
+        "Finished kill yarn app ids in the engine of ({}). The YARN app ids are {}.",
+        getId(),
+        yarnAppIds);
+    super.killTask(taskID);
+  }
+
+  private int getPid(Process process) {
+    try {
+      Class<?> clazz = Class.forName("java.lang.UNIXProcess");
+      Field field = clazz.getDeclaredField("pid");
+      field.setAccessible(true);
+      return field.getInt(process);
+    } catch (Exception e) {
+      logger.warn("Failed to acquire pid for shell process");
+      return -1;
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      killAll();
+      logAsyncService.shutdown();
+    } catch (Exception e) {
+      logger.error("Shell ec failed to close ");
+    }
+    super.close();
+  }
+
+  @Override
+  public void killAll() {
+    Iterator<ShellECTaskInfo> iterator = shellECTaskInfoCache.values().iterator();
+    while (iterator.hasNext()) {
+      ShellECTaskInfo shellECTaskInfo = iterator.next();
+      killTask(shellECTaskInfo.getTaskId());
+    }
+  }
+
+  @Override
+  public int getConcurrentLimit() {
+    return maxRunningNumber;
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
new file mode 100644
index 000000000..964417765
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.engineconn.core.EngineConnObject;
+import org.apache.linkis.governance.common.utils.GovernanceUtils;
+import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
+import org.apache.linkis.manager.common.entity.resource.NodeResource;
+import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
+import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst;
+import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException;
+import org.apache.linkis.manager.label.entity.Label;
+import org.apache.linkis.protocol.engine.JobProgressInfo;
+import org.apache.linkis.rpc.Sender;
+import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShellEngineConnExecutor extends ComputationExecutor {
+  private static final Logger logger = LoggerFactory.getLogger(ShellEngineConnExecutor.class);
+
+  private int id;
+  private EngineExecutionContext engineExecutionContext;
+  private List<Label<?>> executorLabels = new ArrayList<>();
+  private Process process;
+  private YarnAppIdExtractor extractor;
+
+  public ShellEngineConnExecutor(int id) {
+    super(id);
+    this.id = id;
+  }
+
+  @Override
+  public void init() {
+    logger.info("Ready to change engine state!");
+    super.init();
+  }
+
+  @Override
+  public ExecuteResponse executeCompletely(
+      EngineExecutionContext engineExecutionContext, String code, String completedLine) {
+    final String newcode = completedLine + code;
+    logger.debug("newcode is " + newcode);
+    return executeLine(engineExecutionContext, newcode);
+  }
+
+  @Override
+  public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String code) {
+    if (engineExecutionContext != null) {
+      this.engineExecutionContext = engineExecutionContext;
+      logger.info("Shell executor reset new engineExecutionContext!");
+    }
+
+    BufferedReader bufferedReader = null;
+    BufferedReader errorsReader = null;
+
+    AtomicBoolean completed = new AtomicBoolean(false);
+    ReaderThread errReaderThread = null;
+    ReaderThread inputReaderThread = null;
+
+    try {
+      engineExecutionContext.appendStdout(getId() + " >> " + code.trim());
+
+      String[] argsArr = null;
+      if (engineExecutionContext.getTotalParagraph() == 1
+          && engineExecutionContext.getProperties() != null
+          && engineExecutionContext
+              .getProperties()
+              .containsKey(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)) {
+
+        ArrayList<String> argsList =
+            (ArrayList<String>)
+                engineExecutionContext
+                    .getProperties()
+                    .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
+
+        try {
+          argsArr = argsList.toArray(new String[argsList.size()]);
+          logger.info(
+              "Will execute shell task with user-specified arguments: '{}'",
+              Arrays.toString(argsArr));
+        } catch (Exception t) {
+          logger.warn(
+              "Cannot read user-input shell arguments. Will execute shell task without them.", t);
+        }
+      }
+
+      String workingDirectory = null;
+      if (engineExecutionContext.getTotalParagraph() == 1
+          && engineExecutionContext.getProperties() != null
+          && engineExecutionContext
+              .getProperties()
+              .containsKey(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)) {
+
+        String wdStr =
+            (String)
+                engineExecutionContext
+                    .getProperties()
+                    .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
+
+        try {
+          if (isExecutePathExist(wdStr)) {
+            logger.info(
+                "Will execute shell task under user-specified working-directory: '" + wdStr + "'");
+            workingDirectory = wdStr;
+          } else {
+            logger.warn(
+                "User-specified working-directory: '"
+                    + wdStr
+                    + "' does not exist or user does not have access permission. "
+                    + "Will execute shell task under default working-directory. Please contact the administrator!");
+          }
+        } catch (Exception t) {
+          logger.warn(
+              "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
+              t);
+        }
+      }
+
+      String[] generatedCode =
+          argsArr == null || argsArr.length == 0
+              ? generateRunCode(code)
+              : generateRunCodeWithArgs(code, argsArr);
+
+      ProcessBuilder processBuilder = new ProcessBuilder(generatedCode);
+      if (StringUtils.isNotBlank(workingDirectory)) {
+        processBuilder.directory(new File(workingDirectory));
+      }
+
+      processBuilder.redirectErrorStream(false);
+      extractor = new YarnAppIdExtractor();
+      process = processBuilder.start();
+
+      bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+      errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+      CountDownLatch counter = new CountDownLatch(2);
+      inputReaderThread =
+          new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter);
+      errReaderThread =
+          new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter);
+
+      inputReaderThread.start();
+      errReaderThread.start();
+
+      int exitCode = process.waitFor();
+      counter.await();
+
+      completed.set(true);
+
+      if (exitCode != 0) {
+        return new ErrorExecuteResponse("run shell failed", new ShellCodeErrorException());
+      } else {
+        return new SuccessExecuteResponse();
+      }
+
+    } catch (Exception e) {
+      logger.error("Execute shell code failed, reason:", e);
+      return new ErrorExecuteResponse("run shell failed", e);
+
+    } finally {
+      if (errorsReader != null) {
+        inputReaderThread.onDestroy();
+      }
+      if (inputReaderThread != null) {
+        errReaderThread.onDestroy();
+      }
+      IOUtils.closeQuietly(bufferedReader);
+      IOUtils.closeQuietly(errorsReader);
+    }
+  }
+
+  private boolean isExecutePathExist(String executePath) {
+    File etlHomeDir = new File(executePath);
+    return (etlHomeDir.exists() && etlHomeDir.isDirectory());
+  }
+
+  private String[] generateRunCode(String code) {
+    return new String[] {"sh", "-c", code};
+  }
+
+  private String[] generateRunCodeWithArgs(String code, String[] args) {
+    return new String[] {
+      "sh",
+      "-c",
+      "echo \"dummy " + StringUtils.join(args, " ") + "\" | xargs sh -c \'" + code + "\'"
+    };
+  }
+
+  @Override
+  public String getId() {
+    return Sender.getThisServiceInstance().getInstance() + "_" + id;
+  }
+
+  @Override
+  public JobProgressInfo[] getProgressInfo(String taskID) {
+    List<JobProgressInfo> jobProgressInfo = new ArrayList<>();
+    if (this.engineExecutionContext == null) {
+      return jobProgressInfo.toArray(new JobProgressInfo[0]);
+    }
+
+    String jobId =
+        engineExecutionContext.getJobId().isDefined()
+            ? engineExecutionContext.getJobId().get()
+            : "";
+    if (progress(taskID) == 0.0f) {
+      jobProgressInfo.add(new JobProgressInfo(jobId, 1, 1, 0, 0));
+    } else {
+      jobProgressInfo.add(new JobProgressInfo(jobId, 1, 0, 0, 1));
+    }
+    return jobProgressInfo.toArray(new JobProgressInfo[0]);
+  }
+
+  @Override
+  public float progress(String taskID) {
+    if (this.engineExecutionContext != null) {
+      return this.engineExecutionContext.getCurrentParagraph()
+          / (float) this.engineExecutionContext.getTotalParagraph();
+    } else {
+      return 0.0f;
+    }
+  }
+
+  @Override
+  public boolean supportCallBackLogs() {
+    // todo
+    return true;
+  }
+
+  @Override
+  public NodeResource requestExpectedResource(NodeResource expectedResource) {
+    return null;
+  }
+
+  @Override
+  public NodeResource getCurrentNodeResource() {
+    CommonNodeResource resource = new CommonNodeResource();
+    resource.setUsedResource(
+        NodeResourceUtils.applyAsLoadInstanceResource(
+            EngineConnObject.getEngineCreationContext().getOptions()));
+    return resource;
+  }
+
+  @Override
+  public List<Label<?>> getExecutorLabels() {
+    return executorLabels;
+  }
+
+  @Override
+  public void setExecutorLabels(List<Label<?>> labels) {
+    if (labels != null) {
+      executorLabels.clear();
+      executorLabels.addAll(labels);
+    }
+  }
+
+  @Override
+  public void killTask(String taskID) {
+
+    /*
+     Kill sub-processes
+    */
+    int pid = getPid(process);
+    GovernanceUtils.killProcess(String.valueOf(pid), "kill task " + taskID + " process", false);
+
+    /*
+     Kill yarn-applications
+    */
+    List<String> yarnAppIds = extractor.getExtractedYarnAppIds();
+    GovernanceUtils.killYarnJobApp(yarnAppIds);
+    logger.info("Finished kill yarn app ids in the engine of ({})", getId());
+    super.killTask(taskID);
+  }
+
+  private int getPid(Process process) {
+    try {
+      Class<?> clazz = Class.forName("java.lang.UNIXProcess");
+      Field field = clazz.getDeclaredField("pid");
+      field.setAccessible(true);
+      return field.getInt(process);
+    } catch (Exception e) {
+      logger.warn("Failed to acquire pid for shell process");
+      return -1;
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      process.destroy();
+    } catch (Exception e) {
+      logger.error("kill process " + process.toString() + " failed ", e);
+    } catch (Throwable t) {
+      logger.error("kill process " + process.toString() + " failed ", t);
+    }
+    super.close();
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.java b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.java
new file mode 100644
index 000000000..c138e79e6
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/main/java/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.engineconn.common.conf.EngineConnConf;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.*;
+import java.util.Collections;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class YarnAppIdExtractor {
+
+  private final Set<String> appIdList = Collections.synchronizedSet(new HashSet<>());
+
+  private final String regex =
+      EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX().getValue();
+  private final Pattern pattern = Pattern.compile(regex);
+
+  public void appendLineToExtractor(String content) {
+    if (StringUtils.isBlank(content)) {
+      return;
+    }
+    Matcher yarnAppIDMatcher = pattern.matcher(content);
+    if (yarnAppIDMatcher.find()) {
+      String yarnAppID = yarnAppIDMatcher.group(2);
+      appIdList.add(yarnAppID);
+    }
+  }
+
+  public List<String> getExtractedYarnAppIds() {
+    synchronized (appIdList) {
+      return new ArrayList<>(appIdList);
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.scala
deleted file mode 100644
index 115b4763e..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/ShellEngineConnPlugin.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell
-
-import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin
-import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory
-import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.common.resource.{
-  EngineResourceFactory,
-  GenericEngineResourceFactory
-}
-import org.apache.linkis.manager.engineplugin.shell.builder.ShellProcessEngineConnLaunchBuilder
-import org.apache.linkis.manager.engineplugin.shell.factory.ShellEngineConnFactory
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.engine.EngineType
-import org.apache.linkis.manager.label.utils.EngineTypeLabelCreator
-
-import java.util
-
-class ShellEngineConnPlugin extends EngineConnPlugin {
-
-  private val resourceLocker = new Object()
-
-  private val engineLaunchBuilderLocker = new Object()
-
-  private val engineFactoryLocker = new Object()
-
-  private var engineResourceFactory: EngineResourceFactory = _
-
-  private var engineLaunchBuilder: EngineConnLaunchBuilder = _
-
-  private var engineFactory: EngineConnFactory = _
-
-  private val defaultLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
-  override def init(params: util.Map[String, AnyRef]): Unit = {
-    val engineTypeLabel = EngineTypeLabelCreator.createEngineTypeLabel(EngineType.SHELL.toString)
-    this.defaultLabels.add(engineTypeLabel)
-  }
-
-  override def getEngineResourceFactory: EngineResourceFactory = {
-    if (null == engineResourceFactory) resourceLocker synchronized {
-      engineResourceFactory = new GenericEngineResourceFactory
-    }
-    engineResourceFactory
-  }
-
-  override def getEngineConnLaunchBuilder: EngineConnLaunchBuilder = {
-    new ShellProcessEngineConnLaunchBuilder
-  }
-
-  override def getEngineConnFactory: EngineConnFactory = {
-    if (null == engineFactory) engineFactoryLocker synchronized {
-      engineFactory = new ShellEngineConnFactory
-    }
-    engineFactory
-  }
-
-  override def getDefaultLabels: util.List[Label[_]] = this.defaultLabels
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.scala
deleted file mode 100644
index e91c5923c..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/exception/NoCorrectUserException.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.exception
-
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.manager.engineplugin.shell.errorcode.LinkisCommonsErrorCodeSummary._
-
-case class NoCorrectUserException()
-    extends ErrorException(NO_ILLEGAL_USER_HOLDS.getErrorCode, NO_ILLEGAL_USER_HOLDS.getErrorDesc)
-
-case class ShellCodeErrorException()
-    extends ErrorException(SHELL_CODE_IS_WRONG.getErrorCode, SHELL_CODE_IS_WRONG.getErrorDesc)
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
deleted file mode 100644
index 8277cb116..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ReaderThread.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.conf.CommonVars
-import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-
-import java.io.BufferedReader
-import java.util
-import java.util.concurrent.CountDownLatch
-
-class ReaderThread extends Thread with Logging {
-
-  private var engineExecutionContext: EngineExecutionContext = _
-  private var inputReader: BufferedReader = _
-  private var extractor: YarnAppIdExtractor = _
-  private var isStdout: Boolean = false
-  private val logListCount = CommonVars[Int]("wds.linkis.engineconn.log.list.count", 50)
-  private var counter: CountDownLatch = _
-
-  private var isReaderAlive = true
-
-  def this(
-      engineExecutionContext: EngineExecutionContext,
-      inputReader: BufferedReader,
-      extractor: YarnAppIdExtractor,
-      isStdout: Boolean,
-      counter: CountDownLatch
-  ) {
-    this()
-    this.inputReader = inputReader
-    this.engineExecutionContext = engineExecutionContext
-    this.extractor = extractor
-    this.isStdout = isStdout
-    this.counter = counter
-  }
-
-  def onDestroy(): Unit = {
-    isReaderAlive = false
-  }
-
-  def startReaderThread(): Unit = {
-    Utils.tryCatch {
-      this.start()
-    } { t =>
-      throw t
-    }
-  }
-
-  override def run(): Unit = {
-    Utils.tryCatch {
-      var line: String = null
-      val logArray: util.List[String] = new util.ArrayList[String]
-      while ({ line = inputReader.readLine(); line != null && isReaderAlive }) {
-        logger.info("read logger line :{}", line)
-        logArray.add(line)
-        extractor.appendLineToExtractor(line)
-        if (isStdout) engineExecutionContext.appendTextResultSet(line)
-        if (logArray.size > logListCount.getValue) {
-          val linelist = StringUtils.join(logArray, "\n")
-          engineExecutionContext.appendStdout(linelist)
-          logArray.clear()
-        }
-      }
-      if (logArray.size > 0) {
-        val linelist = StringUtils.join(logArray, "\n")
-        engineExecutionContext.appendStdout(linelist)
-        logArray.clear()
-      }
-    } { t =>
-      logger.warn("inputReader reading the input stream", t)
-    }
-    IOUtils.closeQuietly(inputReader)
-    counter.countDown()
-  }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
deleted file mode 100644
index b66353ca0..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.utils.{Logging, OverloadUtils, Utils}
-import org.apache.linkis.engineconn.computation.executor.execute.{
-  ConcurrentComputationExecutor,
-  EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.governance.common.utils.GovernanceUtils
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadResource,
-  NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
-import org.apache.linkis.manager.engineplugin.shell.conf.ShellEngineConnConf
-import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
-  ErrorExecuteResponse,
-  ExecuteResponse,
-  SuccessExecuteResponse
-}
-
-import org.apache.commons.lang3.StringUtils
-
-import java.io.{BufferedReader, File, InputStreamReader}
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContextExecutorService
-
-class ShellEngineConnConcurrentExecutor(id: Int, maxRunningNumber: Int)
-    extends ConcurrentComputationExecutor
-    with Logging {
-
-  private var engineExecutionContext: EngineExecutionContext = _
-
-  private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
-  private val shellECTaskInfoCache: util.Map[String, ShellECTaskInfo] =
-    new ConcurrentHashMap[String, ShellECTaskInfo]()
-
-  private implicit val logAsyncService: ExecutionContextExecutorService =
-    Utils.newCachedExecutionContext(
-      ShellEngineConnConf.LOG_SERVICE_MAX_THREAD_SIZE,
-      "ShelLogService-Thread-"
-    )
-
-  override def init(): Unit = {
-    logger.info(s"Ready to change engine state!")
-    super.init
-  }
-
-  override def executeCompletely(
-      engineExecutionContext: EngineExecutionContext,
-      code: String,
-      completedLine: String
-  ): ExecuteResponse = {
-    val newcode = completedLine + code
-    logger.debug("newcode is " + newcode)
-    executeLine(engineExecutionContext, newcode)
-  }
-
-  override def executeLine(
-      engineExecutionContext: EngineExecutionContext,
-      code: String
-  ): ExecuteResponse = {
-
-    if (null != engineExecutionContext) {
-      this.engineExecutionContext = engineExecutionContext
-      logger.info("Shell executor reset new engineExecutionContext!")
-    }
-
-    if (engineExecutionContext.getJobId.isEmpty) {
-      return ErrorExecuteResponse("taskID is null", null)
-    }
-
-    val taskId = engineExecutionContext.getJobId.get
-    var bufferedReader: BufferedReader = null
-    var errorsReader: BufferedReader = null
-
-    val completed = new AtomicBoolean(false)
-    var errReaderThread: ReaderThread = null
-    var inputReaderThread: ReaderThread = null
-
-    try {
-      engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")
-
-      val argsArr =
-        if (
-            engineExecutionContext.getTotalParagraph == 1 &&
-            engineExecutionContext.getProperties != null &&
-            engineExecutionContext.getProperties.containsKey(
-              ShellEngineConnPluginConst.RUNTIME_ARGS_KEY
-            )
-        ) {
-          Utils.tryCatch {
-            val argsList = engineExecutionContext.getProperties
-              .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
-              .asInstanceOf[util.ArrayList[String]]
-            logger.info(
-              "Will execute shell task with user-specified arguments: \'" + argsList
-                .toArray(new Array[String](argsList.size()))
-                .mkString("\' \'") + "\'"
-            )
-            argsList.toArray(new Array[String](argsList.size()))
-          } { t =>
-            logger.warn(
-              "Cannot read user-input shell arguments. Will execute shell task without them.",
-              t
-            )
-            null
-          }
-        } else {
-          null
-        }
-
-      val workingDirectory =
-        if (
-            engineExecutionContext.getTotalParagraph == 1 &&
-            engineExecutionContext.getProperties != null &&
-            engineExecutionContext.getProperties.containsKey(
-              ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
-            )
-        ) {
-          Utils.tryCatch {
-            val wdStr = engineExecutionContext.getProperties
-              .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)
-              .asInstanceOf[String]
-            if (isExecutePathExist(wdStr)) {
-              logger.info(
-                "Will execute shell task under user-specified working-directory: \'" + wdStr
-              )
-              wdStr
-            } else {
-              logger.warn(
-                "User-specified working-directory: \'" + wdStr + "\' does not exist or user does not have access permission. " +
-                  "Will execute shell task under default working-directory. Please contact the administrator!"
-              )
-              null
-            }
-          } { t =>
-            logger.warn(
-              "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
-              t
-            )
-            null
-          }
-        } else {
-          null
-        }
-
-      val generatedCode = if (argsArr == null || argsArr.isEmpty) {
-        generateRunCode(code)
-      } else {
-        generateRunCodeWithArgs(code, argsArr)
-      }
-
-      val processBuilder: ProcessBuilder = new ProcessBuilder(generatedCode: _*)
-      if (StringUtils.isNotBlank(workingDirectory)) {
-        processBuilder.directory(new File(workingDirectory))
-      }
-
-      processBuilder.redirectErrorStream(false)
-      val extractor = new YarnAppIdExtractor
-      val process = processBuilder.start()
-      bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
-      errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
-      // add task id and task Info cache
-      shellECTaskInfoCache.put(taskId, ShellECTaskInfo(taskId, process, extractor))
-
-      val counter: CountDownLatch = new CountDownLatch(2)
-      inputReaderThread =
-        new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
-      errReaderThread =
-        new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter)
-
-      logAsyncService.execute(inputReaderThread)
-      logAsyncService.execute(errReaderThread)
-
-      val exitCode = process.waitFor()
-      counter.await()
-
-      completed.set(true)
-
-      if (exitCode != 0) {
-        ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
-      } else SuccessExecuteResponse()
-
-    } catch {
-      case e: Exception =>
-        logger.error("Execute shell code failed, reason:", e)
-        ErrorExecuteResponse("run shell failed", e)
-    } finally {
-      if (null != errorsReader) {
-        inputReaderThread.onDestroy()
-      }
-      if (null != inputReaderThread) {
-        errReaderThread.onDestroy()
-      }
-      shellECTaskInfoCache.remove(taskId)
-    }
-  }
-
-  private def isExecutePathExist(executePath: String): Boolean = {
-    val etlHomeDir: File = new File(executePath)
-    (etlHomeDir.exists() && etlHomeDir.isDirectory)
-  }
-
-  private def generateRunCode(code: String): Array[String] = {
-    Array("sh", "-c", code)
-  }
-
-  private def generateRunCodeWithArgs(code: String, args: Array[String]): Array[String] = {
-    Array("sh", "-c", "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'")
-  }
-
-  override def getId(): String = Sender.getThisServiceInstance.getInstance + "_" + id
-
-  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = {
-    val jobProgressInfo = new ArrayBuffer[JobProgressInfo]()
-    if (null == this.engineExecutionContext) {
-      return jobProgressInfo.toArray
-    }
-    if (0.0f == progress(taskID)) {
-      jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0)
-    } else {
-      jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 0, 0, 1)
-    }
-    jobProgressInfo.toArray
-  }
-
-  override def progress(taskID: String): Float = {
-    if (null != this.engineExecutionContext) {
-      this.engineExecutionContext.getCurrentParagraph / this.engineExecutionContext.getTotalParagraph
-        .asInstanceOf[Float]
-    } else {
-      0.0f
-    }
-  }
-
-  override def supportCallBackLogs(): Boolean = {
-    // todo
-    true
-  }
-
-  override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
-    null
-  }
-
-  override def getCurrentNodeResource(): NodeResource = {
-    val resource = new CommonNodeResource
-    resource.setUsedResource(
-      NodeResourceUtils
-        .applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
-    )
-    resource
-  }
-
-  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
-  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
-    if (null != labels) {
-      executorLabels.clear()
-      executorLabels.addAll(labels)
-    }
-  }
-
-  override def killTask(taskID: String): Unit = {
-    val shellECTaskInfo = shellECTaskInfoCache.remove(taskID)
-    if (null == shellECTaskInfo) {
-      return
-    }
-    /*
-      Kill sub-processes
-     */
-    val pid = getPid(shellECTaskInfo.process)
-    GovernanceUtils.killProcess(String.valueOf(pid), s"kill task $taskID process", false)
-    /*
-      Kill yarn-applications
-     */
-    val yarnAppIds = shellECTaskInfo.yarnAppIdExtractor.getExtractedYarnAppIds()
-    GovernanceUtils.killYarnJobApp(yarnAppIds)
-    logger.info(
-      s"Finished kill yarn app ids in the engine of (${getId()}). The yarn app ids are ${yarnAppIds}"
-    )
-    super.killTask(taskID)
-
-  }
-
-  private def getPid(process: Process): Int = {
-    Utils.tryCatch {
-      val clazz = Class.forName("java.lang.UNIXProcess");
-      val field = clazz.getDeclaredField("pid");
-      field.setAccessible(true);
-      field.get(process).asInstanceOf[Int]
-    } { t =>
-      logger.warn("Failed to acquire pid for shell process")
-      -1
-    }
-  }
-
-  override def close(): Unit = {
-    Utils.tryCatch {
-      killAll()
-      logAsyncService.shutdown()
-    } { t: Throwable =>
-      logger.error(s"Shell ec failed to close ", t)
-    }
-    super.close()
-  }
-
-  override def killAll(): Unit = {
-    val iterator = shellECTaskInfoCache.values().iterator()
-    while (iterator.hasNext) {
-      val shellECTaskInfo = iterator.next()
-      Utils.tryAndWarn(killTask(shellECTaskInfo.taskId))
-    }
-  }
-
-  override def getConcurrentLimit: Int = {
-    maxRunningNumber
-  }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
deleted file mode 100644
index c9f320625..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnExecutor.scala
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.utils.{Logging, Utils}
-import org.apache.linkis.engineconn.computation.executor.execute.{
-  ComputationExecutor,
-  EngineExecutionContext
-}
-import org.apache.linkis.engineconn.core.EngineConnObject
-import org.apache.linkis.governance.common.utils.GovernanceUtils
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadInstanceResource,
-  NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
-import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
-import org.apache.linkis.manager.engineplugin.shell.common.ShellEngineConnPluginConst
-import org.apache.linkis.manager.engineplugin.shell.exception.ShellCodeErrorException
-import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.protocol.engine.JobProgressInfo
-import org.apache.linkis.rpc.Sender
-import org.apache.linkis.scheduler.executer.{
-  ErrorExecuteResponse,
-  ExecuteResponse,
-  SuccessExecuteResponse
-}
-
-import org.apache.commons.io.IOUtils
-import org.apache.commons.lang3.StringUtils
-
-import java.io.{BufferedReader, File, FileReader, InputStreamReader, IOException}
-import java.util
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-class ShellEngineConnExecutor(id: Int) extends ComputationExecutor with Logging {
-
-  private var engineExecutionContext: EngineExecutionContext = _
-
-  private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
-
-  private var process: Process = _
-
-  private var extractor: YarnAppIdExtractor = _
-
-  override def init(): Unit = {
-    logger.info(s"Ready to change engine state!")
-    super.init
-  }
-
-  override def executeCompletely(
-      engineExecutionContext: EngineExecutionContext,
-      code: String,
-      completedLine: String
-  ): ExecuteResponse = {
-    val newcode = completedLine + code
-    logger.debug("newcode is " + newcode)
-    executeLine(engineExecutionContext, newcode)
-  }
-
-  override def executeLine(
-      engineExecutionContext: EngineExecutionContext,
-      code: String
-  ): ExecuteResponse = {
-
-    if (null != engineExecutionContext) {
-      this.engineExecutionContext = engineExecutionContext
-      logger.info("Shell executor reset new engineExecutionContext!")
-    }
-
-    var bufferedReader: BufferedReader = null
-    var errorsReader: BufferedReader = null
-
-    val completed = new AtomicBoolean(false)
-    var errReaderThread: ReaderThread = null
-    var inputReaderThread: ReaderThread = null
-
-    try {
-      engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")
-
-      val argsArr =
-        if (
-            engineExecutionContext.getTotalParagraph == 1 &&
-            engineExecutionContext.getProperties != null &&
-            engineExecutionContext.getProperties.containsKey(
-              ShellEngineConnPluginConst.RUNTIME_ARGS_KEY
-            )
-        ) {
-          Utils.tryCatch {
-            val argsList = engineExecutionContext.getProperties
-              .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
-              .asInstanceOf[util.ArrayList[String]]
-            logger.info(
-              "Will execute shell task with user-specified arguments: \'" + argsList
-                .toArray(new Array[String](argsList.size()))
-                .mkString("\' \'") + "\'"
-            )
-            argsList.toArray(new Array[String](argsList.size()))
-          } { t =>
-            logger.warn(
-              "Cannot read user-input shell arguments. Will execute shell task without them.",
-              t
-            )
-            null
-          }
-        } else {
-          null
-        }
-
-      val workingDirectory =
-        if (
-            engineExecutionContext.getTotalParagraph == 1 &&
-            engineExecutionContext.getProperties != null &&
-            engineExecutionContext.getProperties.containsKey(
-              ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
-            )
-        ) {
-          Utils.tryCatch {
-            val wdStr = engineExecutionContext.getProperties
-              .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)
-              .asInstanceOf[String]
-            if (isExecutePathExist(wdStr)) {
-              logger.info(
-                "Will execute shell task under user-specified working-directory: \'" + wdStr
-              )
-              wdStr
-            } else {
-              logger.warn(
-                "User-specified working-directory: \'" + wdStr + "\' does not exist or user does not have access permission. " +
-                  "Will execute shell task under default working-directory. Please contact the administrator!"
-              )
-              null
-            }
-          } { t =>
-            logger.warn(
-              "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
-              t
-            )
-            null
-          }
-        } else {
-          null
-        }
-
-      val generatedCode = if (argsArr == null || argsArr.isEmpty) {
-        generateRunCode(code)
-      } else {
-        generateRunCodeWithArgs(code, argsArr)
-      }
-
-      val processBuilder: ProcessBuilder = new ProcessBuilder(generatedCode: _*)
-      if (StringUtils.isNotBlank(workingDirectory)) {
-        processBuilder.directory(new File(workingDirectory))
-      }
-
-      processBuilder.redirectErrorStream(false)
-      extractor = new YarnAppIdExtractor
-      process = processBuilder.start()
-
-      bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
-      errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
-      val counter: CountDownLatch = new CountDownLatch(2)
-      inputReaderThread =
-        new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
-      errReaderThread =
-        new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter)
-
-      inputReaderThread.start()
-      errReaderThread.start()
-
-      val exitCode = process.waitFor()
-      counter.await()
-
-      completed.set(true)
-
-      if (exitCode != 0) {
-        ErrorExecuteResponse("run shell failed", ShellCodeErrorException())
-      } else SuccessExecuteResponse()
-    } catch {
-      case e: Exception =>
-        logger.error("Execute shell code failed, reason:", e)
-        ErrorExecuteResponse("run shell failed", e)
-    } finally {
-      if (null != errorsReader) {
-        inputReaderThread.onDestroy()
-      }
-      if (null != inputReaderThread) {
-        errReaderThread.onDestroy()
-      }
-      IOUtils.closeQuietly(bufferedReader)
-      IOUtils.closeQuietly(errorsReader)
-    }
-  }
-
-  private def isExecutePathExist(executePath: String): Boolean = {
-    val etlHomeDir: File = new File(executePath)
-    (etlHomeDir.exists() && etlHomeDir.isDirectory())
-  }
-
-  private def generateRunCode(code: String): Array[String] = {
-    Array("sh", "-c", code)
-  }
-
-  private def generateRunCodeWithArgs(code: String, args: Array[String]): Array[String] = {
-    Array("sh", "-c", "echo \"dummy " + args.mkString(" ") + "\" | xargs sh -c \'" + code + "\'")
-  }
-
-  override def getId(): String = Sender.getThisServiceInstance.getInstance + "_" + id
-
-  override def getProgressInfo(taskID: String): Array[JobProgressInfo] = {
-    val jobProgressInfo = new ArrayBuffer[JobProgressInfo]()
-    if (null == this.engineExecutionContext) {
-      return jobProgressInfo.toArray
-    }
-    if (0.0f == progress(taskID)) {
-      jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 1, 0, 0)
-    } else {
-      jobProgressInfo += JobProgressInfo(engineExecutionContext.getJobId.getOrElse(""), 1, 0, 0, 1)
-    }
-    jobProgressInfo.toArray
-  }
-
-  override def progress(taskID: String): Float = {
-    if (null != this.engineExecutionContext) {
-      this.engineExecutionContext.getCurrentParagraph / this.engineExecutionContext.getTotalParagraph
-        .asInstanceOf[Float]
-    } else {
-      0.0f
-    }
-  }
-
-  override def supportCallBackLogs(): Boolean = {
-    // todo
-    true
-  }
-
-  override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
-    null
-  }
-
-  override def getCurrentNodeResource(): NodeResource = {
-    val resource = new CommonNodeResource
-    resource.setUsedResource(
-      NodeResourceUtils
-        .applyAsLoadInstanceResource(EngineConnObject.getEngineCreationContext.getOptions)
-    )
-    resource
-  }
-
-  override def getExecutorLabels(): util.List[Label[_]] = executorLabels
-
-  override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
-    if (null != labels) {
-      executorLabels.clear()
-      executorLabels.addAll(labels)
-    }
-  }
-
-  override def killTask(taskID: String): Unit = {
-    /*
-      Kill sub-processes
-     */
-    val pid = getPid(process)
-    GovernanceUtils.killProcess(String.valueOf(pid), s"kill task $taskID process", false)
-    /*
-      Kill yarn-applications
-     */
-    val yarnAppIds = extractor.getExtractedYarnAppIds()
-    GovernanceUtils.killYarnJobApp(yarnAppIds)
-    logger.info(s"Finished kill yarn app ids in the engine of (${getId()}).}")
-    super.killTask(taskID)
-
-  }
-
-  private def getPid(process: Process): Int = {
-    Utils.tryCatch {
-      val clazz = Class.forName("java.lang.UNIXProcess");
-      val field = clazz.getDeclaredField("pid");
-      field.setAccessible(true);
-      field.get(process).asInstanceOf[Int]
-    } { t =>
-      logger.warn("Failed to acquire pid for shell process")
-      -1
-    }
-  }
-
-  override def close(): Unit = {
-    try {
-      process.destroy()
-    } catch {
-      case e: Exception =>
-        logger.error(s"kill process ${process.toString} failed ", e)
-      case t: Throwable =>
-        logger.error(s"kill process ${process.toString} failed ", t)
-    }
-    super.close()
-  }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
deleted file mode 100644
index 2f5f79663..000000000
--- a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/YarnAppIdExtractor.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.common.utils.Logging
-import org.apache.linkis.engineconn.common.conf.EngineConnConf
-
-import org.apache.commons.lang3.StringUtils
-
-import java.io._
-import java.util
-import java.util.Collections
-import java.util.regex.Pattern
-
-class YarnAppIdExtractor extends Logging {
-
-  private val appIdList: util.Set[String] = Collections.synchronizedSet(new util.HashSet[String]())
-
-  private val regex = EngineConnConf.SPARK_ENGINE_CONN_YARN_APP_ID_PARSE_REGEX.getValue
-  private val pattern = Pattern.compile(regex)
-
-  def appendLineToExtractor(content: String): Unit = {
-    if (StringUtils.isBlank(content)) return
-    val yarnAppIDMatcher = pattern.matcher(content)
-    if (yarnAppIDMatcher.find) {
-      val yarnAppID = yarnAppIDMatcher.group(2)
-      appIdList.add(yarnAppID)
-    }
-  }
-
-  private def doExtractYarnAppId(content: String): Array[String] = {
-
-    if (StringUtils.isBlank(content)) return new Array[String](0)
-    // spark: Starting|Submitted|Activating.{1,100}(application_\d{13}_\d+)
-    // sqoop, importtsv: Submitted application application_1609166102854_970911
-
-    val stringReader = new StringReader(content)
-
-    val reader = new BufferedReader(stringReader)
-
-    val ret = new util.ArrayList[String]
-
-    var line = reader.readLine()
-    while ({
-      line != null
-    }) { // match application_xxx_xxx
-      val mApp = pattern.matcher(line)
-      if (mApp.find) {
-        val candidate1 = mApp.group(2)
-        if (!ret.contains(candidate1)) {
-          ret.add(candidate1)
-        }
-      }
-      line = reader.readLine
-    }
-    stringReader.close()
-    ret.toArray(new Array[String](ret.size()))
-  }
-
-  def getExtractedYarnAppIds(): util.List[String] = {
-    appIdList.synchronized {
-      new util.ArrayList[String](appIdList)
-    }
-  }
-
-}
diff --git a/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala b/linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/factory/ShellEngineConnFactory.scala
old mode 100644
new mode 100755
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.scala b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.java
similarity index 75%
rename from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.scala
rename to linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.java
index 228e5312f..34ce7a671 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.scala
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/TestShellEngineConnPlugin.java
@@ -15,19 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell
+package org.apache.linkis.manager.engineplugin.shell;
 
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-class TestShellEngineConnPlugin {
+public class TestShellEngineConnPlugin {
 
   @Test
-  def testShellEngineConnExecutor: Unit = {
-    val shellEngineConnPlugin = new ShellEngineConnPlugin
-    Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnFactory)
-    Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnLaunchBuilder)
-    Assertions.assertNotNull(shellEngineConnPlugin.getEngineResourceFactory)
-    Assertions.assertNotNull(shellEngineConnPlugin.getDefaultLabels)
+  public void testShellEngineConnExecutor() {
+    ShellEngineConnPlugin shellEngineConnPlugin = new ShellEngineConnPlugin();
+    Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnFactory());
+    Assertions.assertNotNull(shellEngineConnPlugin.getEngineConnLaunchBuilder());
+    Assertions.assertNotNull(shellEngineConnPlugin.getEngineResourceFactory());
+    Assertions.assertNotNull(shellEngineConnPlugin.getDefaultLabels());
   }
-
 }
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.scala b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.java
similarity index 70%
rename from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.scala
rename to linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.java
index 59f00e8ff..36444cf44 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.scala
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/common/TestShellEngineConnPluginConst.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.common
+package org.apache.linkis.manager.engineplugin.shell.common;
 
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-class TestShellEngineConnPluginConst {
+public class TestShellEngineConnPluginConst {
 
   @Test
-  def testShellEngineConnPluginConst: Unit = {
-    Assertions.assertEquals("extraArguments", ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
+  public void testShellEngineConnPluginConst() {
+    Assertions.assertEquals("extraArguments", ShellEngineConnPluginConst.RUNTIME_ARGS_KEY);
     Assertions.assertEquals(
-      "wds.linkis.shell.runtime.working.directory",
-      ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
-    )
+        "wds.linkis.shell.runtime.working.directory",
+        ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY);
   }
-
 }
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.java
similarity index 67%
rename from linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
rename to linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.java
index 50ba61bb0..c3cc8e682 100644
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.scala
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/exception/TestNoCorrectUserException.java
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.manager.engineplugin.shell.exception
+package org.apache.linkis.manager.engineplugin.shell.exception;
 
-import org.junit.jupiter.api.{Assertions, Test}
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-class TestNoCorrectUserException {
+public class TestNoCorrectUserException {
 
   @Test
-  def testNoCorrectUserException: Unit = {
-
-    val exception = NoCorrectUserException
-    Assertions.assertNotNull(exception)
+  public void testNoCorrectUserException() {
+    Assertions.assertNotNull(new NoCorrectUserException());
   }
 
   @Test
-  def testShellCodeErrorException: Unit = {
-
-    val exception = ShellCodeErrorException
-    Assertions.assertNotNull(exception)
+  public void testShellCodeErrorException() {
+    Assertions.assertNotNull(new ShellCodeErrorException());
   }
-
 }
diff --git a/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
new file mode 100644
index 000000000..d327282b2
--- /dev/null
+++ b/linkis-engineconn-plugins/shell/src/test/java/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.engineplugin.shell.executor;
+
+import org.apache.linkis.DataWorkCloudApplication;
+import org.apache.linkis.common.conf.DWCArgumentsParser;
+import org.apache.linkis.common.utils.Utils;
+import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
+import org.apache.linkis.scheduler.executer.ExecuteResponse;
+
+import scala.collection.mutable.HashMap;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestShellEngineConnExecutor {
+
+  @Test
+  public void testShellEngineConnExecutor() throws ReflectiveOperationException {
+    boolean isWindows = System.getProperty("os.name").startsWith("Windows");
+
+    System.setProperty("wds.linkis.server.version", "v1");
+    System.setProperty("HADOOP_CONF_DIR", "./");
+    System.setProperty(
+        "wds.linkis.engineconn.plugin.default.class",
+        "org.apache.linkis.manager.engineplugin.shell.ShellEngineConnPlugin");
+
+    HashMap<String, String> map = new HashMap<>();
+    map.put("spring.mvc.servlet.path", "/api/rest_j/v1");
+    map.put("server.port", "26380");
+    map.put("spring.application.name", "shellEngineExecutor");
+    map.put("eureka.client.register-with-eureka", "false");
+    map.put("eureka.client.fetch-registry", "false");
+    DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(map.toMap(null)));
+    ShellEngineConnExecutor shellEngineConnExecutor = new ShellEngineConnExecutor(1);
+    shellEngineConnExecutor.init();
+    Assertions.assertTrue(shellEngineConnExecutor.isEngineInitialized());
+    if (!isWindows) {
+      EngineExecutionContext engineExecutionContext =
+          new EngineExecutionContext(shellEngineConnExecutor, Utils.getJvmUser());
+      ExecuteResponse response = shellEngineConnExecutor.executeLine(engineExecutionContext, "id");
+      Assertions.assertNotNull(response);
+      shellEngineConnExecutor.close();
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.scala b/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.scala
deleted file mode 100644
index 833db129f..000000000
--- a/linkis-engineconn-plugins/shell/src/test/scala/org/apache/linkis/manager/engineplugin/shell/executor/TestShellEngineConnExecutor.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.linkis.manager.engineplugin.shell.executor
-
-import org.apache.linkis.DataWorkCloudApplication
-import org.apache.linkis.common.conf.DWCArgumentsParser
-import org.apache.linkis.common.utils.Utils
-import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
-
-import scala.collection.mutable
-
-import org.junit.jupiter.api.{Assertions, Test}
-
-class TestShellEngineConnExecutor {
-
-  @Test
-  def testShellEngineConnExecutor: Unit = {
-    val isWindows = System.getProperty("os.name").startsWith("Windows")
-
-    System.setProperty("wds.linkis.server.version", "v1")
-    System.setProperty("HADOOP_CONF_DIR", "./")
-    System.setProperty(
-      "wds.linkis.engineconn.plugin.default.class",
-      "org.apache.linkis.manager.engineplugin.shell.ShellEngineConnPlugin"
-    )
-    val map = new mutable.HashMap[String, String]()
-    map.put("spring.mvc.servlet.path", "/api/rest_j/v1")
-    map.put("server.port", "26380")
-    map.put("spring.application.name", "shellEngineExecutor")
-    map.put("eureka.client.register-with-eureka", "false")
-    map.put("eureka.client.fetch-registry", "false")
-    DataWorkCloudApplication.main(DWCArgumentsParser.formatSpringOptions(map.toMap))
-    val shellEngineConnExecutor = new ShellEngineConnExecutor(1)
-    shellEngineConnExecutor.init()
-    Assertions.assertTrue(shellEngineConnExecutor.isEngineInitialized)
-    if (!isWindows) {
-
-      val engineExecutionContext =
-        new EngineExecutionContext(shellEngineConnExecutor, Utils.getJvmUser)
-      val response = shellEngineConnExecutor.executeLine(engineExecutionContext, "id")
-      Assertions.assertNotNull(response)
-      shellEngineConnExecutor.close()
-    }
-
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org